Author: justi9
Date: 2010-01-06 14:22:06 -0500 (Wed, 06 Jan 2010)
New Revision: 3759
Modified:
mgmt/trunk/cumin/python/cumin/grid/slot.py
mgmt/trunk/cumin/python/cumin/grid/slot.strings
mgmt/trunk/cumin/python/cumin/grid/submission.strings
mgmt/trunk/cumin/python/cumin/messaging/broker.py
mgmt/trunk/cumin/python/cumin/messaging/broker.strings
mgmt/trunk/cumin/python/cumin/messaging/brokerlink.py
mgmt/trunk/cumin/python/cumin/messaging/brokerlink.strings
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/main.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/python/mint/sql.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/python/mint/util.py
mgmt/trunk/mint/sql/schema.sql
mgmt/trunk/mint/sql/triggers.sql
mgmt/trunk/wooly/python/wooly/bench.py
Log:
* Update to latest condor and broker qmf schemas; as of this change,
jobs are gone
* Simplify the qmf metadata on each object; qmf_broker_id and
qmf_scope_id are gone, supplanted by qmf_agent_id and qmf_object_id
(now in string form); this is also a step toward the qmf2 model
* Add classes for generating and parsing synthetic agent and object
IDs until qmf2 makes them obsolete
* Move logic out of the generated methods and into the standard
method caller on MintModel
* Repair some mint tool logging
* Clean up mint imports
Modified: mgmt/trunk/cumin/python/cumin/grid/slot.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/slot.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/cumin/python/cumin/grid/slot.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -425,8 +425,6 @@
def render_slot_info_index(self, session):
return self.index.path
- # XXX these should be item_title, etc.
-
def render_item_title(self, session, item):
return item[0]
Modified: mgmt/trunk/cumin/python/cumin/grid/slot.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/slot.strings 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/cumin/python/cumin/grid/slot.strings 2010-01-06 19:22:06 UTC (rev 3759)
@@ -8,13 +8,11 @@
s.accounting_group,
s.op_sys,
s.arch,
- j.id as jid,
c.activity,
c.state,
c.load_avg
from slot as s
left outer join slot_stats as c on c.id = s.stats_curr_id
-left outer join job as j on j.custom_id = s.job_id
{sql_where}
{sql_order_by}
{sql_limit}
@@ -31,13 +29,11 @@
s.machine,
s.system,
s.job_id,
- j.id as jid,
c.activity,
c.state,
c.load_avg
from slot as s
left outer join slot_stats as c on c.id = s.stats_curr_id
-left outer join job as j on j.custom_id = s.job_id
{sql_where}
{sql_orderby}
{sql_limit}
Modified: mgmt/trunk/cumin/python/cumin/grid/submission.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/submission.strings 2010-01-06 18:28:56 UTC (rev
3758)
+++ mgmt/trunk/cumin/python/cumin/grid/submission.strings 2010-01-06 19:22:06 UTC (rev
3759)
@@ -2,16 +2,14 @@
select
s.id,
s.name,
+ s.owner,
c.idle,
c.running,
c.completed,
- m.id as submitter_id,
- m.name as submitter_name,
d.id as scheduler_id,
d.name as scheduler_name
from submission as s
inner join scheduler as d on s.scheduler_id = d.id
-inner join submitter as m on s.submitter_id = m.id
inner join submission_stats as c on s.stats_curr_id = c.id
{sql_where}
{sql_order_by}
@@ -21,22 +19,21 @@
select count(*)
from submission as s
inner join scheduler as d on s.scheduler_id = d.id
-inner join submitter as m on s.submitter_id = m.id
inner join submission_stats as c on s.stats_curr_id = c.id
{sql_where}
[TopSubmissionSet.sql]
select
- j.id,
- j.name,
- j.qmf_create_time,
- s.pool
-from submission as j
-join scheduler s on s.id = j.scheduler_id
-left outer join submission_stats as ss on ss.id = j.stats_curr_id
-where ss.running > 0
- and ss.qmf_update_time > now() - interval '60 seconds'
-order by j.qmf_create_time asc
+ s.id,
+ s.name,
+ s.qmf_create_time,
+ d.pool
+from submission as s
+join scheduler d on d.id = s.scheduler_id
+left outer join submission_stats as c on c.id = s.stats_curr_id
+where c.running > 0
+ and c.qmf_update_time > now() - interval '60 seconds'
+order by s.qmf_create_time asc
limit 5
[TopSubmissionSet.count_sql]
Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -84,8 +84,8 @@
return "Status"
def render_content(self, session, data):
- scopeId = data["qmf_scope_id"]
- dt = self.app.model.mint.model.getLatestHeartbeat(scopeId)
+ agentId = data["qmf_agent_id"]
+ dt = self.app.model.mint.model.getLatestHeartbeat(agentId)
if dt is None:
return fmt_none()
Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.strings 2010-01-06 18:28:56 UTC (rev
3758)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.strings 2010-01-06 19:22:06 UTC (rev
3759)
@@ -1,7 +1,7 @@
[BrokerSet.sql]
select
b.id,
- b.qmf_scope_id,
+ b.qmf_agent_id,
s.node_name || ':' || b.port as name,
c.cluster_name as cluster
from broker as b
Modified: mgmt/trunk/cumin/python/cumin/messaging/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/brokerlink.py 2010-01-06 18:28:56 UTC (rev
3758)
+++ mgmt/trunk/cumin/python/cumin/messaging/brokerlink.py 2010-01-06 19:22:06 UTC (rev
3759)
@@ -343,7 +343,8 @@
link = self.link.get(session)
if not self.tag.get(session):
- self.tag.set(session, link.qmfBrokerId)
+ brokerId = QpidAgentId.fromString(link.qmfAgentId).brokerId
+ self.tag.set(session, brokerId)
if not self.excludes.get(session):
self.excludes.set(session, "%s:%s" % (link.host, link.port))
Modified: mgmt/trunk/cumin/python/cumin/messaging/brokerlink.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/brokerlink.strings 2010-01-06 18:28:56 UTC
(rev 3758)
+++ mgmt/trunk/cumin/python/cumin/messaging/brokerlink.strings 2010-01-06 19:22:06 UTC
(rev 3759)
@@ -29,8 +29,7 @@
b.tag,
b.excludes,
l.host,
- l.port,
- l.qmf_broker_id
+ l.port
from bridge as b
join link as l on l.id = b.link_id
left outer join bridge_stats as c on c.id = b.stats_curr_id
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -124,9 +124,9 @@
def get_session_by_object(self, object):
assert object
- broker = self.mint.model.mintBrokersById[object.qmfBrokerId]
+ agent = self.mint.model.agentsById[object.qmfAgentId]
- return broker.getAmqpSession()
+ return agent.getBroker().getAmqpSession()
def get_negotiator_limits(self, negotiator):
self.lock.acquire()
@@ -539,13 +539,6 @@
self.model.invocations.add(invoc)
return invoc
- def get_session_by_object(self, object):
- assert object
-
- broker = self.model.mint.model.mintBrokersById[object.qmfBrokerId]
-
- return broker.getAmqpSession()
-
class CuminActionInvocation(object):
def __init__(self, action, object):
self.action = action
@@ -1886,7 +1879,7 @@
class CuminJob(CuminClass):
def __init__(self, model):
- super(CuminJob, self).__init__(model, "job", Job)
+ super(CuminJob, self).__init__(model, "job", None)
### Main Group
prop = AdProperty(self, "Owner")
Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/main.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -108,7 +108,7 @@
param = ConfigParameter(self, "expire-threshold", int)
param.default = 24 * 3600 # 1 day
- def init(self, opts=None):
+ def init(self):
super(MintConfig, self).init()
self.load_file(os.path.join(self.home, "etc", "cumin.conf"))
@@ -117,14 +117,8 @@
self.load_file(os.path.join(os.path.expanduser("~"),
".cumin.conf"))
self.load_file(os.path.join(os.path.expanduser("~"),
".mint.conf"))
- if opts:
- self.load_dict(opts)
-
enable_logging("mint", self.log_level, self.log_file)
- if self.debug:
- enable_logging("mint", "debug", sys.stderr)
-
def get_addr_for_vhost(vhost):
broker = vhost.broker
host = broker.system.nodeName
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -1,26 +1,15 @@
-import logging
-import qmf.console
-import pickle
-import struct
-import sys
-import os
-import types
-import socket
-
from parsley.threadingex import Lifecycle
-from qmf.console import ClassKey
from qpid.datatypes import UUID
from qpid.util import URL
from sqlobject import *
-from threading import Lock, RLock
-from time import sleep
-from traceback import print_exc
from mint import update
from mint.cache import MintCache
from mint.schema import *
-from util import *
+from mint.util import *
+from qmf.console import ClassKey, Console, Session
+
log = logging.getLogger("mint.model")
thisModule = __import__(__name__)
@@ -121,16 +110,7 @@
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.url)
- def getAmqpSession(self):
- return self.qmfBroker.getAmqpSession()
-
- def getAmqpSessionId(self):
- return self.qmfBroker.getSessionId()
-
- def getFullUrl(self):
- return self.qmfBroker.getFullUrl()
-
-class MintModel(qmf.console.Console, Lifecycle):
+class MintModel(Console, Lifecycle):
staticInstance = None
def __init__(self, app):
@@ -148,8 +128,10 @@
self.mintBrokersById = dict()
self.mintBrokersByUrl = dict()
+ self.agentsById = dict()
+
# Agent heartbeats
- # (broker bank, agent bank) => latest heartbeat timestamp
+ # agentId => latest heartbeat timestamp
self.heartbeatsByAgentId = dict()
@@ -172,7 +154,7 @@
def do_init(self):
assert self.qmfSession is None
- self.qmfSession = qmf.console.Session \
+ self.qmfSession = Session \
(self, manageConnections=True, rcvObjects=self.app.updateEnabled)
# clean up any transient objects that a previous instance may have
@@ -192,21 +174,6 @@
for mbroker in self.mintBrokersByQmfBroker.values():
self.delBroker(mbroker)
- def callMethod(self, brokerId, objId, classKey, methodName, callback, args):
- self.lock()
- try:
- broker = self.mintBrokersById[brokerId]
-
- seq = self.qmfSession._sendMethodRequest \
- (broker.qmfBroker, ClassKey(classKey), objId, methodName, args)
-
- if seq is not None:
- self.outstandingMethodCalls[seq] = callback
-
- return seq
- finally:
- self.unlock()
-
def addBroker(self, url):
log.info("Adding qmf broker at %s", url)
@@ -257,6 +224,17 @@
def brokerInfo(self, qbroker):
self.lock()
try:
+ # We now have enough information to generate a proper agent ID;
+ # it would be much better to get the broker ID earlier, or to
+ # simply get a reasonable agent ID with less indirection
+
+ for agent in qbroker.getAgents():
+ agentId = str(QmfAgentId.fromAgent(agent))
+
+ log.debug("Adding agent %s", agentId)
+
+ self.agentsById[agentId] = agent
+
id = str(qbroker.getBrokerId())
mbroker = self.mintBrokersByQmfBroker[qbroker]
@@ -293,6 +271,59 @@
finally:
self.unlock()
+ def newAgent(self, agent):
+ """ Invoked when a QMF agent is discovered. """
+
+ log.info("Agent connected: %s", agent)
+
+ # Some agents come down without a brokerId, meaning we can't
+ # generate a fully qualified agent ID for them. Those we handle
+ # in brokerInfo.
+
+ if agent.getBroker().brokerId:
+ self.lock()
+ try:
+ agentId = str(QmfAgentId.fromAgent(agent))
+
+ log.debug("Adding agent %s", agentId)
+
+ self.agentsById[agentId] = agent
+ finally:
+ self.unlock()
+
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+
+ log.info("Agent disconnected: %s", agent)
+
+ agentId = str(QmfAgentId.fromAgent(agent))
+
+ self.lock()
+ try:
+ del self.agentsById[agentId]
+ finally:
+ self.unlock()
+
+ up = update.AgentDisconnectUpdate(self, agentId)
+ self.app.updateThread.enqueue(up)
+
+ def heartbeat(self, agent, timestamp):
+ agentId = str(QmfAgentId.fromAgent(agent))
+ timestamp = timestamp / 1000000000
+
+ self.lock()
+ try:
+ self.heartbeatsByAgentId[agentId] = datetime.fromtimestamp(timestamp)
+ finally:
+ self.unlock()
+
+ def getLatestHeartbeat(self, agentId):
+ self.lock()
+ try:
+ return self.heartbeatsByAgentId.get(agentId)
+ finally:
+ self.unlock()
+
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
@@ -302,16 +333,6 @@
used to obtain details about the class."""
pass
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
- log.info("Agent connected: %s", agent)
-
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
- log.info("Agent disconnected: %s", agent)
- up = update.AgentDisconnectUpdate(self, self.getAgentDBId(agent,
agent.getBroker().getBrokerId()))
- self.app.updateThread.enqueue(up)
-
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
if not self.app.updateThread.isAlive():
@@ -321,9 +342,6 @@
up = update.PropertyUpdate(self, mbroker, record)
- if record.getClassKey().getClassName() == "job":
- up.priority = 1
-
self.app.updateThread.enqueue(up)
def objectStats(self, broker, record):
@@ -332,9 +350,6 @@
if not self.app.updateThread.isAlive():
return
- if record.getClassKey().getClassName() == "job":
- return
-
mbroker = self.getMintBrokerByQmfBroker(broker)
up = update.StatisticUpdate(self, mbroker, record)
@@ -344,30 +359,32 @@
""" Invoked when an event is raised. """
pass
- def heartbeat(self, agent, timestamp):
- key = (agent.getBrokerBank(), agent.getAgentBank())
- timestamp = timestamp / 1000000000
- self.heartbeatsByAgentId[key] = datetime.fromtimestamp(timestamp)
+ def callMethod(self, mintObject, methodName, callback, args):
+ classKey = ClassKey(mintObject.qmfClassKey)
+ objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
- def getLatestHeartbeat(self, scopeId):
- scope = int(scopeId)
- broker = (scope & 0x0000FFFFF0000000) >> 28
- agent = scope & 0x000000000FFFFFFF
- key = (broker, agent)
+ self.lock()
+ try:
+ agent = self.agentsById[mintObject.qmfAgentId]
+ broker = agent.getBroker()
- return self.heartbeatsByAgentId.get(key)
+ seq = self.qmfSession._sendMethodRequest \
+ (broker, classKey, objectId, methodName, args)
+ if seq is not None:
+ self.outstandingMethodCalls[seq] = callback
+
+ return seq
+ finally:
+ self.unlock()
+
def methodResponse(self, broker, seq, response):
log.debug("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
self.lock()
-
try:
methodCallback = self.outstandingMethodCalls.pop(seq)
methodCallback(response.text, response.outArgs)
finally:
self.unlock()
-
- def getAgentDBId(self, agent, brokerId):
- return "%s.%d.%d" % (brokerId, agent.getBrokerBank(),
agent.getAgentBank())
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/schema.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -12,11 +12,9 @@
class Slot(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -130,126 +128,12 @@
-class Job(SQLObject):
- class sqlmeta:
- lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
- qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
- qmfClassKey = StringCol(length=1000, default=None)
- qmfPersistent = BoolCol(default=None)
- qmfUpdateTime = TimestampCol(default=None)
- qmfCreateTime = TimestampCol(default=None)
- qmfDeleteTime = TimestampCol(default=None)
- statsCurr = ForeignKey('JobStats', cascade='null', default=None)
- statsCurrIndex = DatabaseIndex(statsCurr)
- statsPrev = ForeignKey('JobStats', cascade='null', default=None)
- statsPrevIndex = DatabaseIndex(statsPrev)
- scheduler = ForeignKey('Scheduler', cascade='null', default=None)
- submitter = ForeignKey('Submitter', cascade='null', default=None)
- AccountingGroup = StringCol(length=1000, default=None)
- Args = StringCol(length=4000, default=None)
- ClusterId = BigIntCol(default=None)
- Cmd = StringCol(length=4000, default=None)
- ConcurrencyLimits = StringCol(length=4000, default=None)
- CustomGroup = StringCol(length=1000, default=None)
- CustomId = StringCol(length=1000, default=None)
- CustomPriority = BigIntCol(default=None)
- GlobalJobId = StringCol(length=1000, default=None)
- InRsv = StringCol(length=4000, default=None)
- Iwd = StringCol(length=4000, default=None)
- JobStatus = BigIntCol(default=None)
- Note = StringCol(length=4000, default=None)
- Out = StringCol(length=4000, default=None)
- Owner = StringCol(length=1000, default=None)
- GridUser = StringCol(length=1000, default=None)
- ProcId = BigIntCol(default=None)
- QDate = TimestampCol(default=None)
- JobUniverse = BigIntCol(default=None)
- Title = StringCol(length=1000, default=None)
- UserLog = StringCol(length=4000, default=None)
- HoldReason = StringCol(length=4000, default=None)
- DAGNodeName = StringCol(length=1000, default=None)
- DAGParentNodeNames = StringCol(length=4000, default=None)
- DAGManJobId = BigIntCol(default=None)
- Ad = PickleCol(default=None)
-
-
- def GetAd(self, model, callback, JobAd):
- actualArgs = list()
- if JobAd is not None:
- actualArgs.append(JobAd)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "GetAd",
- callback, args=actualArgs)
-
- def SetAttribute(self, model, callback, Name, Value):
- actualArgs = list()
- if Name is not None:
- actualArgs.append(Name)
- if Value is not None:
- actualArgs.append(Value)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetAttribute",
- callback, args=actualArgs)
-
- def Hold(self, model, callback, Reason):
- actualArgs = list()
- if Reason is not None:
- actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Hold",
- callback, args=actualArgs)
-
- def Release(self, model, callback, Reason):
- actualArgs = list()
- if Reason is not None:
- actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"Release",
- callback, args=actualArgs)
-
- def Remove(self, model, callback, Reason):
- actualArgs = list()
- if Reason is not None:
- actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Remove",
- callback, args=actualArgs)
-
- def Fetch(self, model, callback, File, Start, End, Data):
- actualArgs = list()
- if File is not None:
- actualArgs.append(File)
- if Start is not None:
- actualArgs.append(Start)
- if End is not None:
- actualArgs.append(End)
- if Data is not None:
- actualArgs.append(Data)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Fetch",
- callback, args=actualArgs)
-
-class JobStats(SQLObject):
- class sqlmeta:
- lazyUpdate = True
- qmfUpdateTime = TimestampCol(default=None)
- job = ForeignKey('Job', cascade='null', default=None)
-
-
-
-
class Scheduler(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -279,9 +163,7 @@
actualArgs.append(Ad)
if Id is not None:
actualArgs.append(Id)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Submit",
- callback, args=actualArgs)
+ model.callMethod(self, "Submit", callback, args=actualArgs)
def GetAd(self, model, callback, Id, JobAd):
actualArgs = list()
@@ -289,9 +171,7 @@
actualArgs.append(Id)
if JobAd is not None:
actualArgs.append(JobAd)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "GetAd",
- callback, args=actualArgs)
+ model.callMethod(self, "GetAd", callback, args=actualArgs)
def SetAttribute(self, model, callback, Id, Name, Value):
actualArgs = list()
@@ -301,9 +181,7 @@
actualArgs.append(Name)
if Value is not None:
actualArgs.append(Value)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetAttribute",
- callback, args=actualArgs)
+ model.callMethod(self, "SetAttribute", callback, args=actualArgs)
def Hold(self, model, callback, Id, Reason):
actualArgs = list()
@@ -311,9 +189,7 @@
actualArgs.append(Id)
if Reason is not None:
actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Hold",
- callback, args=actualArgs)
+ model.callMethod(self, "Hold", callback, args=actualArgs)
def Release(self, model, callback, Id, Reason):
actualArgs = list()
@@ -321,9 +197,7 @@
actualArgs.append(Id)
if Reason is not None:
actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"Release",
- callback, args=actualArgs)
+ model.callMethod(self, "Release", callback, args=actualArgs)
def Remove(self, model, callback, Id, Reason):
actualArgs = list()
@@ -331,9 +205,7 @@
actualArgs.append(Id)
if Reason is not None:
actualArgs.append(Reason)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Remove",
- callback, args=actualArgs)
+ model.callMethod(self, "Remove", callback, args=actualArgs)
def Fetch(self, model, callback, Id, File, Start, End, Data):
actualArgs = list()
@@ -347,9 +219,7 @@
actualArgs.append(End)
if Data is not None:
actualArgs.append(Data)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Fetch",
- callback, args=actualArgs)
+ model.callMethod(self, "Fetch", callback, args=actualArgs)
def GetStates(self, model, callback, Submission, State, Count):
actualArgs = list()
@@ -359,9 +229,7 @@
actualArgs.append(State)
if Count is not None:
actualArgs.append(Count)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"GetStates",
- callback, args=actualArgs)
+ model.callMethod(self, "GetStates", callback, args=actualArgs)
def GetJobs(self, model, callback, Submission, Jobs):
actualArgs = list()
@@ -369,9 +237,7 @@
actualArgs.append(Submission)
if Jobs is not None:
actualArgs.append(Jobs)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"GetJobs",
- callback, args=actualArgs)
+ model.callMethod(self, "GetJobs", callback, args=actualArgs)
def echo(self, model, callback, sequence, body):
actualArgs = list()
@@ -379,9 +245,7 @@
actualArgs.append(sequence)
if body is not None:
actualArgs.append(body)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "echo",
- callback, args=actualArgs)
+ model.callMethod(self, "echo", callback, args=actualArgs)
class SchedulerStats(SQLObject):
class sqlmeta:
@@ -408,11 +272,9 @@
class Submitter(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -444,11 +306,9 @@
class Negotiator(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -473,9 +333,7 @@
actualArgs = list()
if Limits is not None:
actualArgs.append(Limits)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"GetLimits",
- callback, args=actualArgs)
+ model.callMethod(self, "GetLimits", callback, args=actualArgs)
def SetLimit(self, model, callback, Name, Max):
actualArgs = list()
@@ -483,9 +341,7 @@
actualArgs.append(Name)
if Max is not None:
actualArgs.append(Max)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetLimit",
- callback, args=actualArgs)
+ model.callMethod(self, "SetLimit", callback, args=actualArgs)
def GetStats(self, model, callback, Name, Ad):
actualArgs = list()
@@ -493,9 +349,7 @@
actualArgs.append(Name)
if Ad is not None:
actualArgs.append(Ad)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"GetStats",
- callback, args=actualArgs)
+ model.callMethod(self, "GetStats", callback, args=actualArgs)
def SetPriority(self, model, callback, Name, Priority):
actualArgs = list()
@@ -503,9 +357,7 @@
actualArgs.append(Name)
if Priority is not None:
actualArgs.append(Priority)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetPriority",
- callback, args=actualArgs)
+ model.callMethod(self, "SetPriority", callback, args=actualArgs)
def SetPriorityFactor(self, model, callback, Name, PriorityFactor):
actualArgs = list()
@@ -513,9 +365,7 @@
actualArgs.append(Name)
if PriorityFactor is not None:
actualArgs.append(PriorityFactor)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetPriorityFactor",
- callback, args=actualArgs)
+ model.callMethod(self, "SetPriorityFactor", callback, args=actualArgs)
def SetUsage(self, model, callback, Name, Usage):
actualArgs = list()
@@ -523,9 +373,7 @@
actualArgs.append(Name)
if Usage is not None:
actualArgs.append(Usage)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetUsage",
- callback, args=actualArgs)
+ model.callMethod(self, "SetUsage", callback, args=actualArgs)
def GetRawConfig(self, model, callback, Name, Value):
actualArgs = list()
@@ -533,9 +381,7 @@
actualArgs.append(Name)
if Value is not None:
actualArgs.append(Value)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"GetRawConfig",
- callback, args=actualArgs)
+ model.callMethod(self, "GetRawConfig", callback, args=actualArgs)
def SetRawConfig(self, model, callback, Name, Value):
actualArgs = list()
@@ -543,15 +389,11 @@
actualArgs.append(Name)
if Value is not None:
actualArgs.append(Value)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"SetRawConfig",
- callback, args=actualArgs)
+ model.callMethod(self, "SetRawConfig", callback, args=actualArgs)
def Reconfig(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"Reconfig",
- callback, args=actualArgs)
+ model.callMethod(self, "Reconfig", callback, args=actualArgs)
class NegotiatorStats(SQLObject):
class sqlmeta:
@@ -572,11 +414,9 @@
class Collector(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -612,11 +452,9 @@
class Master(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -643,17 +481,13 @@
actualArgs = list()
if Subsystem is not None:
actualArgs.append(Subsystem)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Start",
- callback, args=actualArgs)
+ model.callMethod(self, "Start", callback, args=actualArgs)
def Stop(self, model, callback, Subsystem):
actualArgs = list()
if Subsystem is not None:
actualArgs.append(Subsystem)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "Stop",
- callback, args=actualArgs)
+ model.callMethod(self, "Stop", callback, args=actualArgs)
class MasterStats(SQLObject):
class sqlmeta:
@@ -674,11 +508,9 @@
class Grid(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -716,11 +548,9 @@
class Submission(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -731,8 +561,8 @@
statsPrev = ForeignKey('SubmissionStats', cascade='null',
default=None)
statsPrevIndex = DatabaseIndex(statsPrev)
scheduler = ForeignKey('Scheduler', cascade='null', default=None)
- submitter = ForeignKey('Submitter', cascade='null', default=None)
Name = StringCol(length=1000, default=None)
+ Owner = StringCol(length=1000, default=None)
class SubmissionStats(SQLObject):
@@ -752,11 +582,9 @@
class Acl(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -776,9 +604,7 @@
def reloadACLFile(self, model, callback):
"""Reload the ACL file"""
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"reloadACLFile",
- callback, args=actualArgs)
+ model.callMethod(self, "reloadACLFile", callback, args=actualArgs)
class AclStats(SQLObject):
class sqlmeta:
@@ -793,11 +619,9 @@
class Cluster(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -822,15 +646,11 @@
actualArgs = list()
if brokerId is not None:
actualArgs.append(brokerId)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"stopClusterNode",
- callback, args=actualArgs)
+ model.callMethod(self, "stopClusterNode", callback, args=actualArgs)
def stopFullCluster(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"stopFullCluster",
- callback, args=actualArgs)
+ model.callMethod(self, "stopFullCluster", callback, args=actualArgs)
class ClusterStats(SQLObject):
class sqlmeta:
@@ -844,11 +664,9 @@
class Store(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -892,11 +710,9 @@
class Journal(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -926,9 +742,7 @@
actualArgs = list()
if by is not None:
actualArgs.append(by)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "expand",
- callback, args=actualArgs)
+ model.callMethod(self, "expand", callback, args=actualArgs)
class JournalStats(SQLObject):
class sqlmeta:
@@ -971,11 +785,9 @@
class System(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1005,11 +817,9 @@
class Broker(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1037,9 +847,7 @@
actualArgs.append(sequence)
if body is not None:
actualArgs.append(body)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "echo",
- callback, args=actualArgs)
+ model.callMethod(self, "echo", callback, args=actualArgs)
def connect(self, model, callback, host, port, durable, authMechanism, username,
password, transport):
"""Establish a connection to another broker"""
@@ -1058,9 +866,7 @@
actualArgs.append(password)
if transport is not None:
actualArgs.append(transport)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"connect",
- callback, args=actualArgs)
+ model.callMethod(self, "connect", callback, args=actualArgs)
def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
@@ -1071,9 +877,7 @@
actualArgs.append(destQueue)
if qty is not None:
actualArgs.append(qty)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"queueMoveMessages",
- callback, args=actualArgs)
+ model.callMethod(self, "queueMoveMessages", callback, args=actualArgs)
class BrokerStats(SQLObject):
class sqlmeta:
@@ -1088,11 +892,9 @@
class Agent(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1122,11 +924,9 @@
class Vhost(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1153,11 +953,9 @@
class Queue(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1173,6 +971,7 @@
autoDelete = BoolCol(default=None)
exclusive = BoolCol(default=None)
arguments = PickleCol(default=None)
+ exchange = ForeignKey('Exchange', cascade='null', default=None)
def purge(self, model, callback, request):
@@ -1180,9 +979,7 @@
actualArgs = list()
if request is not None:
actualArgs.append(request)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "purge",
- callback, args=actualArgs)
+ model.callMethod(self, "purge", callback, args=actualArgs)
class QueueStats(SQLObject):
class sqlmeta:
@@ -1223,11 +1020,9 @@
class Exchange(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1270,11 +1065,9 @@
class Binding(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1301,14 +1094,47 @@
+class Subscription(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+ qmfAgentId = StringCol(length=1000, default=None)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
+ qmfClassKey = StringCol(length=1000, default=None)
+ qmfPersistent = BoolCol(default=None)
+ qmfUpdateTime = TimestampCol(default=None)
+ qmfCreateTime = TimestampCol(default=None)
+ qmfDeleteTime = TimestampCol(default=None)
+ statsCurr = ForeignKey('SubscriptionStats', cascade='null',
default=None)
+ statsCurrIndex = DatabaseIndex(statsCurr)
+ statsPrev = ForeignKey('SubscriptionStats', cascade='null',
default=None)
+ statsPrevIndex = DatabaseIndex(statsPrev)
+ session = ForeignKey('Session', cascade='null', default=None)
+ queue = ForeignKey('Queue', cascade='null', default=None)
+ name = StringCol(length=1000, default=None)
+ browsing = BoolCol(default=None)
+ acknowledged = BoolCol(default=None)
+ exclusive = BoolCol(default=None)
+ creditMode = StringCol(length=1000, default=None)
+ arguments = PickleCol(default=None)
+
+
+class SubscriptionStats(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
+ qmfUpdateTime = TimestampCol(default=None)
+ subscription = ForeignKey('Subscription', cascade='null',
default=None)
+ delivered = BigIntCol(default=None)
+
+
+
+
class ClientConnection(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1331,9 +1157,7 @@
def close(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "close",
- callback, args=actualArgs)
+ model.callMethod(self, "close", callback, args=actualArgs)
class ClientConnectionStats(SQLObject):
class sqlmeta:
@@ -1352,11 +1176,9 @@
class Link(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1375,9 +1197,7 @@
def close(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "close",
- callback, args=actualArgs)
+ model.callMethod(self, "close", callback, args=actualArgs)
def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue,
srcIsLocal, dynamic, sync):
"""Bridge messages over the link"""
@@ -1402,9 +1222,7 @@
actualArgs.append(dynamic)
if sync is not None:
actualArgs.append(sync)
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "bridge",
- callback, args=actualArgs)
+ model.callMethod(self, "bridge", callback, args=actualArgs)
class LinkStats(SQLObject):
class sqlmeta:
@@ -1420,11 +1238,9 @@
class Bridge(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1450,9 +1266,7 @@
def close(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "close",
- callback, args=actualArgs)
+ model.callMethod(self, "close", callback, args=actualArgs)
class BridgeStats(SQLObject):
class sqlmeta:
@@ -1466,11 +1280,9 @@
class Session(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1492,27 +1304,19 @@
def solicitAck(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"solicitAck",
- callback, args=actualArgs)
+ model.callMethod(self, "solicitAck", callback, args=actualArgs)
def detach(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "detach",
- callback, args=actualArgs)
+ model.callMethod(self, "detach", callback, args=actualArgs)
def resetLifespan(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey,
"resetLifespan",
- callback, args=actualArgs)
+ model.callMethod(self, "resetLifespan", callback, args=actualArgs)
def close(self, model, callback):
actualArgs = list()
- originalId = ObjectId(None, self.qmfScopeId + 9223372036854775808L, self.qmfObjectId
+ 9223372036854775808L)
- model.callMethod(self.qmfBrokerId, originalId, self.qmfClassKey, "close",
- callback, args=actualArgs)
+ model.callMethod(self, "close", callback, args=actualArgs)
class SessionStats(SQLObject):
class sqlmeta:
@@ -1532,11 +1336,9 @@
class Sysimage(SQLObject):
class sqlmeta:
lazyUpdate = True
- qmfBrokerId = StringCol(length=1000, default=None)
- qmfScopeId = BigIntCol(default=None)
qmfAgentId = StringCol(length=1000, default=None)
- qmfObjectId = BigIntCol(default=None)
- qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId, qmfObjectId, unique=True)
+ qmfObjectId = StringCol(length=1000, default=None)
+ qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId, unique=True)
qmfClassKey = StringCol(length=1000, default=None)
qmfPersistent = BoolCol(default=None)
qmfUpdateTime = TimestampCol(default=None)
@@ -1587,16 +1389,6 @@
Slot.sqlmeta.addJoin(SQLMultipleJoin('SlotStats',
joinMethodName='stats'))
-classToSchemaNameMap['Job'] = 'Job'
-schemaNameToClassMap['Job'] = Job
-
-Scheduler.sqlmeta.addJoin(SQLMultipleJoin('Job', joinMethodName='jobs'))
-
-Submitter.sqlmeta.addJoin(SQLMultipleJoin('Job', joinMethodName='jobs'))
-
-
-Job.sqlmeta.addJoin(SQLMultipleJoin('JobStats', joinMethodName='stats'))
-
classToSchemaNameMap['Scheduler'] = 'Scheduler'
schemaNameToClassMap['Scheduler'] = Scheduler
@@ -1635,9 +1427,7 @@
Scheduler.sqlmeta.addJoin(SQLMultipleJoin('Submission',
joinMethodName='submissions'))
-Submitter.sqlmeta.addJoin(SQLMultipleJoin('Submission',
joinMethodName='submissions'))
-
Submission.sqlmeta.addJoin(SQLMultipleJoin('SubmissionStats',
joinMethodName='stats'))
classToSchemaNameMap['Acl'] = 'Acl'
@@ -1708,7 +1498,9 @@
Vhost.sqlmeta.addJoin(SQLMultipleJoin('Queue', joinMethodName='queues'))
+Exchange.sqlmeta.addJoin(SQLMultipleJoin('Queue',
joinMethodName='queues'))
+
Queue.sqlmeta.addJoin(SQLMultipleJoin('QueueStats',
joinMethodName='stats'))
classToSchemaNameMap['Exchange'] = 'Exchange'
@@ -1731,6 +1523,16 @@
Binding.sqlmeta.addJoin(SQLMultipleJoin('BindingStats',
joinMethodName='stats'))
+classToSchemaNameMap['Subscription'] = 'Subscription'
+schemaNameToClassMap['Subscription'] = Subscription
+
+Session.sqlmeta.addJoin(SQLMultipleJoin('Subscription',
joinMethodName='subscriptions'))
+
+Queue.sqlmeta.addJoin(SQLMultipleJoin('Subscription',
joinMethodName='subscriptions'))
+
+
+Subscription.sqlmeta.addJoin(SQLMultipleJoin('SubscriptionStats',
joinMethodName='stats'))
+
classToSchemaNameMap['ClientConnection'] = 'ClientConnection'
schemaNameToClassMap['ClientConnection'] = ClientConnection
@@ -1771,6 +1573,6 @@
Sysimage.sqlmeta.addJoin(SQLMultipleJoin('SysimageStats',
joinMethodName='stats'))
-entityClasses = ['Slot', 'Job', 'Scheduler', 'Submitter',
'Negotiator', 'Collector', 'Master', 'Grid',
'Submission', 'Acl', 'Cluster', 'Store',
'Journal', 'System', 'Broker', 'Agent', 'Vhost',
'Queue', 'Exchange', 'Binding', 'ClientConnection',
'Link', 'Bridge', 'Session', 'Sysimage']
+entityClasses = ['Slot', 'Scheduler', 'Submitter',
'Negotiator', 'Collector', 'Master', 'Grid',
'Submission', 'Acl', 'Cluster', 'Store',
'Journal', 'System', 'Broker', 'Agent', 'Vhost',
'Queue', 'Exchange', 'Binding', 'Subscription',
'ClientConnection', 'Link', 'Bridge', 'Session',
'Sysimage']
-statsClasses = ['SlotStats', 'JobStats', 'SchedulerStats',
'SubmitterStats', 'NegotiatorStats', 'CollectorStats',
'MasterStats', 'GridStats', 'SubmissionStats', 'AclStats',
'ClusterStats', 'StoreStats', 'JournalStats',
'SystemStats', 'BrokerStats', 'AgentStats', 'VhostStats',
'QueueStats', 'ExchangeStats', 'BindingStats',
'ClientConnectionStats', 'LinkStats', 'BridgeStats',
'SessionStats', 'SysimageStats']
+statsClasses = ['SlotStats', 'SchedulerStats', 'SubmitterStats',
'NegotiatorStats', 'CollectorStats', 'MasterStats',
'GridStats', 'SubmissionStats', 'AclStats',
'ClusterStats', 'StoreStats', 'JournalStats',
'SystemStats', 'BrokerStats', 'AgentStats', 'VhostStats',
'QueueStats', 'ExchangeStats', 'BindingStats',
'SubscriptionStats', 'ClientConnectionStats', 'LinkStats',
'BridgeStats', 'SessionStats', 'SysimageStats']
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -101,12 +101,9 @@
self.pythonOutput += " lazyUpdate = %s\n" % (lazyUpdate)
def generateQmfIdsIndex(self):
- self.generateAttrib("qmfBrokerId", "StringCol",
"length=1000")
- self.generateAttrib("qmfScopeId", "BigIntCol")
self.generateAttrib("qmfAgentId", "StringCol",
"length=1000")
- self.generateAttrib("qmfObjectId", "BigIntCol")
- self.pythonOutput += " qmfIdsUnique = DatabaseIndex(qmfBrokerId, qmfScopeId,
qmfObjectId, unique=True)\n"
-# self.pythonOutput += " qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId,
unique=True)\n"
+ self.generateAttrib("qmfObjectId", "StringCol",
"length=1000")
+ self.pythonOutput += " qmfIdsUnique = DatabaseIndex(qmfAgentId, qmfObjectId,
unique=True)\n"
self.generateAttrib("qmfClassKey", "StringCol",
"length=1000")
self.generateAttrib("qmfPersistent", "BoolCol")
@@ -198,9 +195,8 @@
self.pythonOutput += "\n def %s(self, model, callback%s):\n" %
(elem["@name"], formalArgs)
self.pythonOutput += comment
self.pythonOutput += actualArgs
- self.pythonOutput += " originalId = ObjectId(None, self.qmfScopeId +
9223372036854775808L, self.qmfObjectId + 9223372036854775808L)\n"
- self.pythonOutput += " model.callMethod(self.qmfBrokerId, originalId,
self.qmfClassKey, \"%s\",\n" % elem["@name"]
- self.pythonOutput += " callback, args=actualArgs)\n"
+ self.pythonOutput += " model.callMethod(self, \"%s\", " %
elem["@name"]
+ self.pythonOutput += "callback, args=actualArgs)\n"
def endClass(self):
if (self.additionalPythonOutput != ""):
Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/sql.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -73,10 +73,9 @@
table = self.cls.sqlmeta.table
return """
- select id from %s
- where qmf_scope_id = %%(qmfScopeId)s
- and qmf_object_id = %%(qmfObjectId)s
- and qmf_broker_id = %%(qmfBrokerId)s
+ select id
+ from %s
+ where qmf_agent_id = %%(qmfAgentId)s and qmf_object_id = %%(qmfObjectId)s
""" % table
class SqlSetStatsRefs(SqlOperation):
@@ -164,9 +163,8 @@
sql = """
delete from %s
where qmf_delete_time < now() - interval '%%(threshold)s seconds'
+ and qmf_persistent = 'f'
""" % (table)
- if self.cls is not mint.Job:
- sql += " and qmf_persistent = 'f' "
return sql
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -72,6 +72,7 @@
if self.config.debug:
self.config.prt()
+ enable_logging("mint", "debug", sys.stderr)
self.do_run(opts, args)
@@ -151,6 +152,7 @@
if self.config.debug:
self.config.prt()
+ enable_logging("mint", "debug", sys.stderr)
app = Mint(self.config)
app.updateEnabled = False
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -1,19 +1,12 @@
-import sys
-import logging
-import types
import pickle
-import psycopg2
import mint
-from Queue import Queue as ConcurrentQueue, Full, Empty
-from threading import Thread
-from traceback import print_exc
+
+from collections import deque
from qpid.datatypes import UUID
-from mint.schema import *
-from time import clock, sleep
+
+from schema import *
from sql import *
-from collections import deque
from util import *
-from datetime import datetime, timedelta
log = logging.getLogger("mint.update")
@@ -73,9 +66,9 @@
#log.debug("Queue is empty")
continue
- self.process_update(update)
+ self.processUpdate(update)
- def process_update(self, update):
+ def processUpdate(self, update):
try:
update.process(self)
@@ -151,21 +144,30 @@
if key.type == 10:
self.processReference(name, value, results)
- elif key.type == 8:
+ continue
+
+ if not hasattr(cls, name):
+ # Discard attrs that we don't have in our schema
+ log.debug("%s has no field '%s'" % (cls, name))
+ continue
+
+ if key.type == 8:
self.processTimestamp(name, value, results)
- elif key.type == 14:
+ continue
+
+ if key.type == 14:
# Convert UUIDs into their string representation, to be
# handled by sqlobject
results[name] = str(value)
- elif key.type == 15:
+ continue
+
+ if key.type == 15:
#if value:
results[name] = pickle.dumps(value)
- elif not hasattr(cls, name):
- # Discard attrs that we don't have in our schema
- log.debug("%s has no field '%s'" % (cls, name))
- else:
- results[name] = value
+ continue
+ results[name] = value
+
return results
# XXX this needs to be a much more straightforward procedure
@@ -216,35 +218,19 @@
thread.deferCount += 1
return
- timestamps = self.object.getTimestamps()
+ update, create, delete = self.object.getTimestamps()
- self.processTimestamp("qmfUpdateTime", timestamps[0], attrs)
+ self.processTimestamp("qmfUpdateTime", update, attrs)
+ self.processTimestamp("qmfCreateTime", create, attrs)
- delta = timedelta(seconds=thread.app.expireThreshold)
+ if delete != 0:
+ self.processTimestamp("qmfDeleteTime", delete, attrs)
- if cls is Job and attrs["qmfUpdateTime"] < datetime.now() - delta:
- # drop updates for Jobs that are older then the expiration
- # threshold, since they would be deleted anyway in the next run
- # of the db expiration thread
-
- log.debug("Property update is older than expiration threshold; skipping
it")
-
- thread.dropCount += 1
- return
-
- self.processTimestamp("qmfCreateTime", timestamps[1], attrs)
-
- if timestamps[2] != 0:
- self.processTimestamp("qmfDeleteTime", timestamps[2], attrs)
-
log.debug("%s(%s) marked deleted", cls.__name__, oid)
- attrs["qmfScopeId"] = oid.first - 9223372036854775808L
- attrs["qmfObjectId"] = oid.second - 9223372036854775808L
+ attrs["qmfAgentId"] = str(QmfAgentId.fromObject(self.object))
attrs["qmfClassKey"] = str(self.object.getClassKey())
- qmfBrokerId = str(self.broker.qmfBroker.getBrokerId())
- attrs["qmfBrokerId"] = qmfBrokerId
- attrs["qmfAgentId"] = self.model.getAgentDBId(oid, qmfBrokerId)
+ attrs["qmfObjectId"] = str(QmfObjectId.fromObject(self.object))
attrs["qmfPersistent"] = oid.isDurable()
cursor = thread.cursor()
@@ -277,7 +263,6 @@
id = cursor.fetchone()[0]
log.debug("%s(%i) created", cls.__name__, id)
-
else:
# Case 2
Modified: mgmt/trunk/mint/python/mint/util.py
===================================================================
--- mgmt/trunk/mint/python/mint/util.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/python/mint/util.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -1,9 +1,16 @@
-import sys, os, logging
+import logging
+import os
+import sys
+from Queue import Queue as ConcurrentQueue, Full, Empty
from crypt import crypt
+from datetime import datetime, timedelta
from getpass import getpass
+from qmf.console import ObjectId
from random import sample
-from threading import Thread
+from threading import Thread, Lock, RLock
+from time import clock, sleep
+from traceback import print_exc
log = logging.getLogger("mint.util")
@@ -38,6 +45,74 @@
return password
+class QmfAgentId(object):
+ def __init__(self, brokerId, brokerBank, agentBank):
+ assert brokerId
+
+ self.brokerId = brokerId
+ self.brokerBank = brokerBank
+ self.agentBank = agentBank
+
+ def fromObject(cls, object):
+ broker = object.getBroker()
+
+ brokerId = broker.getBrokerId()
+ brokerBank = broker.getBrokerBank()
+ agentBank = object.getObjectId().getAgentBank()
+
+ return cls(brokerId, brokerBank, agentBank)
+
+ def fromAgent(cls, agent):
+ broker = agent.getBroker()
+
+ brokerId = broker.getBrokerId()
+ brokerBank = broker.getBrokerBank()
+ agentBank = agent.getAgentBank()
+
+ return cls(brokerId, brokerBank, agentBank)
+
+ def fromString(cls, string):
+ brokerId, brokerBank, agentBank = string.split(".")
+
+ brokerBank = int(brokerBank)
+ agentBank = int(agentBank)
+
+ return cls(brokerId, brokerBank, agentBank)
+
+ fromObject = classmethod(fromObject)
+ fromAgent = classmethod(fromAgent)
+ fromString = classmethod(fromString)
+
+ def __str__(self):
+ return "%s.%i.%i" % (self.brokerId, self.brokerBank, self.agentBank)
+
+class QmfObjectId(object):
+ def __init__(self, first, second):
+ self.first = first
+ self.second = second
+
+ def fromObject(cls, object):
+ oid = object.getObjectId()
+
+ return cls(oid.first, oid.second)
+
+ def fromString(cls, string):
+ first, second = string.split(".")
+
+ first = int(first)
+ second = int(second)
+
+ return cls(first, second)
+
+ fromObject = classmethod(fromObject)
+ fromString = classmethod(fromString)
+
+ def toObjectId(self):
+ return ObjectId(None, self.first, self.second)
+
+ def __str__(self):
+ return "%i.%i" % (self.first, self.second)
+
password_chars =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
def crypt_password(password, salt=None):
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/sql/schema.sql 2010-01-06 19:22:06 UTC (rev 3759)
@@ -38,10 +38,8 @@
CREATE TABLE acl (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -55,9 +53,9 @@
transfer_acl BOOL,
last_acl_load TIMESTAMP
);
-CREATE UNIQUE INDEX acl_qmfIdsUnique ON acl (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX acl_statsCurrIndex ON acl (stats_curr_id);
+CREATE UNIQUE INDEX acl_qmfIdsUnique ON acl (qmf_agent_id, qmf_object_id);
CREATE INDEX acl_statsPrevIndex ON acl (stats_prev_id);
+CREATE INDEX acl_statsCurrIndex ON acl (stats_curr_id);
CREATE TABLE acl_stats (
id SERIAL PRIMARY KEY,
@@ -68,10 +66,8 @@
CREATE TABLE agent (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -86,9 +82,9 @@
broker_bank BIGINT,
agent_bank BIGINT
);
-CREATE UNIQUE INDEX agent_qmfIdsUnique ON agent (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX agent_statsCurrIndex ON agent (stats_curr_id);
+CREATE UNIQUE INDEX agent_qmfIdsUnique ON agent (qmf_agent_id, qmf_object_id);
CREATE INDEX agent_statsPrevIndex ON agent (stats_prev_id);
+CREATE INDEX agent_statsCurrIndex ON agent (stats_curr_id);
CREATE TABLE agent_stats (
id SERIAL PRIMARY KEY,
@@ -98,10 +94,8 @@
CREATE TABLE binding (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -115,7 +109,7 @@
arguments BYTEA,
origin VARCHAR(1000)
);
-CREATE UNIQUE INDEX binding_qmfIdsUnique ON binding (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX binding_qmfIdsUnique ON binding (qmf_agent_id, qmf_object_id);
CREATE INDEX binding_statsPrevIndex ON binding (stats_prev_id);
CREATE INDEX binding_statsCurrIndex ON binding (stats_curr_id);
@@ -128,10 +122,8 @@
CREATE TABLE bridge (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -152,7 +144,7 @@
dynamic BOOL,
sync_rsv INT
);
-CREATE UNIQUE INDEX bridge_qmfIdsUnique ON bridge (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX bridge_qmfIdsUnique ON bridge (qmf_agent_id, qmf_object_id);
CREATE INDEX bridge_statsCurrIndex ON bridge (stats_curr_id);
CREATE INDEX bridge_statsPrevIndex ON bridge (stats_prev_id);
@@ -164,10 +156,8 @@
CREATE TABLE broker (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -185,7 +175,7 @@
version VARCHAR(1000),
data_dir VARCHAR(1000)
);
-CREATE UNIQUE INDEX broker_qmfIdsUnique ON broker (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX broker_qmfIdsUnique ON broker (qmf_agent_id, qmf_object_id);
CREATE INDEX broker_statsCurrIndex ON broker (stats_curr_id);
CREATE INDEX broker_statsPrevIndex ON broker (stats_prev_id);
@@ -198,10 +188,8 @@
CREATE TABLE client_connection (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -219,7 +207,7 @@
remote_pid BIGINT,
remote_parent_pid BIGINT
);
-CREATE UNIQUE INDEX client_connection_qmfIdsUnique ON client_connection (qmf_broker_id,
qmf_scope_id, qmf_object_id);
+CREATE UNIQUE INDEX client_connection_qmfIdsUnique ON client_connection (qmf_agent_id,
qmf_object_id);
CREATE INDEX client_connection_statsCurrIndex ON client_connection (stats_curr_id);
CREATE INDEX client_connection_statsPrevIndex ON client_connection (stats_prev_id);
@@ -236,10 +224,8 @@
CREATE TABLE cluster (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -257,7 +243,7 @@
members VARCHAR(4000),
member_i_ds VARCHAR(4000)
);
-CREATE UNIQUE INDEX cluster_qmfIdsUnique ON cluster (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX cluster_qmfIdsUnique ON cluster (qmf_agent_id, qmf_object_id);
CREATE INDEX cluster_statsCurrIndex ON cluster (stats_curr_id);
CREATE INDEX cluster_statsPrevIndex ON cluster (stats_prev_id);
@@ -269,10 +255,8 @@
CREATE TABLE collector (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -287,9 +271,9 @@
name VARCHAR(1000),
public_network_ip_addr VARCHAR(1000)
);
-CREATE UNIQUE INDEX collector_qmfIdsUnique ON collector (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX collector_statsCurrIndex ON collector (stats_curr_id);
+CREATE UNIQUE INDEX collector_qmfIdsUnique ON collector (qmf_agent_id, qmf_object_id);
CREATE INDEX collector_statsPrevIndex ON collector (stats_prev_id);
+CREATE INDEX collector_statsCurrIndex ON collector (stats_curr_id);
CREATE TABLE collector_stats (
id SERIAL PRIMARY KEY,
@@ -305,10 +289,8 @@
CREATE TABLE exchange (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -324,9 +306,9 @@
exchange_id INT,
arguments BYTEA
);
-CREATE UNIQUE INDEX exchange_qmfIdsUnique ON exchange (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX exchange_statsCurrIndex ON exchange (stats_curr_id);
+CREATE UNIQUE INDEX exchange_qmfIdsUnique ON exchange (qmf_agent_id, qmf_object_id);
CREATE INDEX exchange_statsPrevIndex ON exchange (stats_prev_id);
+CREATE INDEX exchange_statsCurrIndex ON exchange (stats_curr_id);
CREATE TABLE exchange_stats (
id SERIAL PRIMARY KEY,
@@ -348,10 +330,8 @@
CREATE TABLE grid (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -367,9 +347,9 @@
submit_limit BIGINT,
grid_resource_unavailable_time TIMESTAMP
);
-CREATE UNIQUE INDEX grid_qmfIdsUnique ON grid (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX grid_statsCurrIndex ON grid (stats_curr_id);
+CREATE UNIQUE INDEX grid_qmfIdsUnique ON grid (qmf_agent_id, qmf_object_id);
CREATE INDEX grid_statsPrevIndex ON grid (stats_prev_id);
+CREATE INDEX grid_statsCurrIndex ON grid (stats_curr_id);
CREATE TABLE grid_stats (
id SERIAL PRIMARY KEY,
@@ -384,64 +364,10 @@
idle_jobs BIGINT
);
-CREATE TABLE job (
- id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
- qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
- qmf_class_key VARCHAR(1000),
- qmf_persistent BOOL,
- qmf_update_time TIMESTAMP,
- qmf_create_time TIMESTAMP,
- qmf_delete_time TIMESTAMP,
- stats_curr_id INT,
- stats_prev_id INT,
- scheduler_id INT,
- submitter_id INT,
- accounting_group VARCHAR(1000),
- args VARCHAR(4000),
- cluster_id BIGINT,
- cmd VARCHAR(4000),
- concurrency_limits VARCHAR(4000),
- custom_group VARCHAR(1000),
- custom_id VARCHAR(1000),
- custom_priority BIGINT,
- global_job_id VARCHAR(1000),
- in_rsv VARCHAR(4000),
- iwd VARCHAR(4000),
- job_status BIGINT,
- note VARCHAR(4000),
- out VARCHAR(4000),
- owner VARCHAR(1000),
- grid_user VARCHAR(1000),
- proc_id BIGINT,
- q_date TIMESTAMP,
- job_universe BIGINT,
- title VARCHAR(1000),
- user_log VARCHAR(4000),
- hold_reason VARCHAR(4000),
- dag_node_name VARCHAR(1000),
- dag_parent_node_names VARCHAR(4000),
- dag_man_job_id BIGINT,
- ad BYTEA
-);
-CREATE UNIQUE INDEX job_qmfIdsUnique ON job (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX job_statsPrevIndex ON job (stats_prev_id);
-CREATE INDEX job_statsCurrIndex ON job (stats_curr_id);
-
-CREATE TABLE job_stats (
- id SERIAL PRIMARY KEY,
- qmf_update_time TIMESTAMP,
- job_id INT
-);
-
CREATE TABLE journal (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -463,7 +389,7 @@
max_file_count INT,
data_file_size BIGINT
);
-CREATE UNIQUE INDEX journal_qmfIdsUnique ON journal (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX journal_qmfIdsUnique ON journal (qmf_agent_id, qmf_object_id);
CREATE INDEX journal_statsPrevIndex ON journal (stats_prev_id);
CREATE INDEX journal_statsCurrIndex ON journal (stats_curr_id);
@@ -504,10 +430,8 @@
CREATE TABLE link (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -521,9 +445,9 @@
transport VARCHAR(1000),
durable BOOL
);
-CREATE UNIQUE INDEX link_qmfIdsUnique ON link (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX link_statsCurrIndex ON link (stats_curr_id);
+CREATE UNIQUE INDEX link_qmfIdsUnique ON link (qmf_agent_id, qmf_object_id);
CREATE INDEX link_statsPrevIndex ON link (stats_prev_id);
+CREATE INDEX link_statsCurrIndex ON link (stats_curr_id);
CREATE TABLE link_stats (
id SERIAL PRIMARY KEY,
@@ -535,10 +459,8 @@
CREATE TABLE master (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -557,7 +479,7 @@
condor_version VARCHAR(1000),
daemon_start_time TIMESTAMP
);
-CREATE UNIQUE INDEX master_qmfIdsUnique ON master (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX master_qmfIdsUnique ON master (qmf_agent_id, qmf_object_id);
CREATE INDEX master_statsCurrIndex ON master (stats_curr_id);
CREATE INDEX master_statsPrevIndex ON master (stats_prev_id);
@@ -575,10 +497,8 @@
CREATE TABLE negotiator (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -595,9 +515,9 @@
condor_version VARCHAR(1000),
daemon_start_time TIMESTAMP
);
-CREATE UNIQUE INDEX negotiator_qmfIdsUnique ON negotiator (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX negotiator_statsCurrIndex ON negotiator (stats_curr_id);
+CREATE UNIQUE INDEX negotiator_qmfIdsUnique ON negotiator (qmf_agent_id, qmf_object_id);
CREATE INDEX negotiator_statsPrevIndex ON negotiator (stats_prev_id);
+CREATE INDEX negotiator_statsCurrIndex ON negotiator (stats_curr_id);
CREATE TABLE negotiator_stats (
id SERIAL PRIMARY KEY,
@@ -618,10 +538,8 @@
CREATE TABLE queue (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -634,9 +552,10 @@
durable BOOL,
auto_delete BOOL,
exclusive BOOL,
- arguments BYTEA
+ arguments BYTEA,
+ exchange_id INT
);
-CREATE UNIQUE INDEX queue_qmfIdsUnique ON queue (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX queue_qmfIdsUnique ON queue (qmf_agent_id, qmf_object_id);
CREATE INDEX queue_statsCurrIndex ON queue (stats_curr_id);
CREATE INDEX queue_statsPrevIndex ON queue (stats_prev_id);
@@ -675,10 +594,8 @@
CREATE TABLE scheduler (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -698,7 +615,7 @@
condor_version VARCHAR(1000),
daemon_start_time TIMESTAMP
);
-CREATE UNIQUE INDEX scheduler_qmfIdsUnique ON scheduler (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX scheduler_qmfIdsUnique ON scheduler (qmf_agent_id, qmf_object_id);
CREATE INDEX scheduler_statsCurrIndex ON scheduler (stats_curr_id);
CREATE INDEX scheduler_statsPrevIndex ON scheduler (stats_prev_id);
@@ -722,10 +639,8 @@
CREATE TABLE session (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -742,7 +657,7 @@
expire_time TIMESTAMP,
max_client_rate BIGINT
);
-CREATE UNIQUE INDEX session_qmfIdsUnique ON session (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX session_qmfIdsUnique ON session (qmf_agent_id, qmf_object_id);
CREATE INDEX session_statsCurrIndex ON session (stats_curr_id);
CREATE INDEX session_statsPrevIndex ON session (stats_prev_id);
@@ -760,10 +675,8 @@
CREATE TABLE slot (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -824,7 +737,7 @@
condor_version VARCHAR(1000),
daemon_start_time TIMESTAMP
);
-CREATE UNIQUE INDEX slot_qmfIdsUnique ON slot (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX slot_qmfIdsUnique ON slot (qmf_agent_id, qmf_object_id);
CREATE INDEX slot_statsCurrIndex ON slot (stats_curr_id);
CREATE INDEX slot_statsPrevIndex ON slot (stats_prev_id);
@@ -875,10 +788,8 @@
CREATE TABLE store (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -898,7 +809,7 @@
tpl_data_file_size BIGINT,
tpl_current_file_count BIGINT
);
-CREATE UNIQUE INDEX store_qmfIdsUnique ON store (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX store_qmfIdsUnique ON store (qmf_agent_id, qmf_object_id);
CREATE INDEX store_statsCurrIndex ON store (stats_curr_id);
CREATE INDEX store_statsPrevIndex ON store (stats_prev_id);
@@ -919,10 +830,8 @@
CREATE TABLE submission (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -931,10 +840,10 @@
stats_curr_id INT,
stats_prev_id INT,
scheduler_id INT,
- submitter_id INT,
- name VARCHAR(1000)
+ name VARCHAR(1000),
+ owner VARCHAR(1000)
);
-CREATE UNIQUE INDEX submission_qmfIdsUnique ON submission (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX submission_qmfIdsUnique ON submission (qmf_agent_id, qmf_object_id);
CREATE INDEX submission_statsPrevIndex ON submission (stats_prev_id);
CREATE INDEX submission_statsCurrIndex ON submission (stats_curr_id);
@@ -951,10 +860,8 @@
CREATE TABLE submitter (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -968,7 +875,7 @@
name VARCHAR(1000),
schedd_name VARCHAR(1000)
);
-CREATE UNIQUE INDEX submitter_qmfIdsUnique ON submitter (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX submitter_qmfIdsUnique ON submitter (qmf_agent_id, qmf_object_id);
CREATE INDEX submitter_statsPrevIndex ON submitter (stats_prev_id);
CREATE INDEX submitter_statsCurrIndex ON submitter (stats_curr_id);
@@ -981,12 +888,41 @@
running_jobs BIGINT
);
+CREATE TABLE subscription (
+ id SERIAL PRIMARY KEY,
+ qmf_agent_id VARCHAR(1000),
+ qmf_object_id VARCHAR(1000),
+ qmf_class_key VARCHAR(1000),
+ qmf_persistent BOOL,
+ qmf_update_time TIMESTAMP,
+ qmf_create_time TIMESTAMP,
+ qmf_delete_time TIMESTAMP,
+ stats_curr_id INT,
+ stats_prev_id INT,
+ session_id INT,
+ queue_id INT,
+ name VARCHAR(1000),
+ browsing BOOL,
+ acknowledged BOOL,
+ exclusive BOOL,
+ credit_mode VARCHAR(1000),
+ arguments BYTEA
+);
+CREATE UNIQUE INDEX subscription_qmfIdsUnique ON subscription (qmf_agent_id,
qmf_object_id);
+CREATE INDEX subscription_statsCurrIndex ON subscription (stats_curr_id);
+CREATE INDEX subscription_statsPrevIndex ON subscription (stats_prev_id);
+
+CREATE TABLE subscription_stats (
+ id SERIAL PRIMARY KEY,
+ qmf_update_time TIMESTAMP,
+ subscription_id INT,
+ delivered BIGINT
+);
+
CREATE TABLE sysimage (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -1004,7 +940,7 @@
mem_total BIGINT,
swap_total BIGINT
);
-CREATE UNIQUE INDEX sysimage_qmfIdsUnique ON sysimage (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX sysimage_qmfIdsUnique ON sysimage (qmf_agent_id, qmf_object_id);
CREATE INDEX sysimage_statsPrevIndex ON sysimage (stats_prev_id);
CREATE INDEX sysimage_statsCurrIndex ON sysimage (stats_curr_id);
@@ -1023,10 +959,8 @@
CREATE TABLE system (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -1041,9 +975,9 @@
version VARCHAR(1000),
machine VARCHAR(1000)
);
-CREATE UNIQUE INDEX system_qmfIdsUnique ON system (qmf_broker_id, qmf_scope_id,
qmf_object_id);
-CREATE INDEX system_statsCurrIndex ON system (stats_curr_id);
+CREATE UNIQUE INDEX system_qmfIdsUnique ON system (qmf_agent_id, qmf_object_id);
CREATE INDEX system_statsPrevIndex ON system (stats_prev_id);
+CREATE INDEX system_statsCurrIndex ON system (stats_curr_id);
CREATE TABLE system_stats (
id SERIAL PRIMARY KEY,
@@ -1053,10 +987,8 @@
CREATE TABLE vhost (
id SERIAL PRIMARY KEY,
- qmf_broker_id VARCHAR(1000),
- qmf_scope_id BIGINT,
qmf_agent_id VARCHAR(1000),
- qmf_object_id BIGINT,
+ qmf_object_id VARCHAR(1000),
qmf_class_key VARCHAR(1000),
qmf_persistent BOOL,
qmf_update_time TIMESTAMP,
@@ -1068,7 +1000,7 @@
name VARCHAR(1000),
federation_tag VARCHAR(1000)
);
-CREATE UNIQUE INDEX vhost_qmfIdsUnique ON vhost (qmf_broker_id, qmf_scope_id,
qmf_object_id);
+CREATE UNIQUE INDEX vhost_qmfIdsUnique ON vhost (qmf_agent_id, qmf_object_id);
CREATE INDEX vhost_statsPrevIndex ON vhost (stats_prev_id);
CREATE INDEX vhost_statsCurrIndex ON vhost (stats_curr_id);
@@ -1168,16 +1100,6 @@
ALTER TABLE grid_stats ADD CONSTRAINT grid_id_exists FOREIGN KEY (grid_id) REFERENCES
grid (id) ON DELETE SET NULL;
-ALTER TABLE job ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES job_stats (id) ON DELETE SET NULL;
-
-ALTER TABLE job ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id)
REFERENCES job_stats (id) ON DELETE SET NULL;
-
-ALTER TABLE job ADD CONSTRAINT scheduler_id_exists FOREIGN KEY (scheduler_id) REFERENCES
scheduler (id) ON DELETE SET NULL;
-
-ALTER TABLE job ADD CONSTRAINT submitter_id_exists FOREIGN KEY (submitter_id) REFERENCES
submitter (id) ON DELETE SET NULL;
-
-ALTER TABLE job_stats ADD CONSTRAINT job_id_exists FOREIGN KEY (job_id) REFERENCES job
(id) ON DELETE SET NULL;
-
ALTER TABLE journal ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES journal_stats (id) ON DELETE SET NULL;
ALTER TABLE journal ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id)
REFERENCES journal_stats (id) ON DELETE SET NULL;
@@ -1212,6 +1134,8 @@
ALTER TABLE queue ADD CONSTRAINT vhost_id_exists FOREIGN KEY (vhost_id) REFERENCES vhost
(id) ON DELETE SET NULL;
+ALTER TABLE queue ADD CONSTRAINT exchange_id_exists FOREIGN KEY (exchange_id) REFERENCES
exchange (id) ON DELETE SET NULL;
+
ALTER TABLE queue_stats ADD CONSTRAINT queue_id_exists FOREIGN KEY (queue_id) REFERENCES
queue (id) ON DELETE SET NULL;
ALTER TABLE scheduler ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES scheduler_stats (id) ON DELETE SET NULL;
@@ -1250,8 +1174,6 @@
ALTER TABLE submission ADD CONSTRAINT scheduler_id_exists FOREIGN KEY (scheduler_id)
REFERENCES scheduler (id) ON DELETE SET NULL;
-ALTER TABLE submission ADD CONSTRAINT submitter_id_exists FOREIGN KEY (submitter_id)
REFERENCES submitter (id) ON DELETE SET NULL;
-
ALTER TABLE submission_stats ADD CONSTRAINT submission_id_exists FOREIGN KEY
(submission_id) REFERENCES submission (id) ON DELETE SET NULL;
ALTER TABLE submitter ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES submitter_stats (id) ON DELETE SET NULL;
@@ -1262,6 +1184,16 @@
ALTER TABLE submitter_stats ADD CONSTRAINT submitter_id_exists FOREIGN KEY (submitter_id)
REFERENCES submitter (id) ON DELETE SET NULL;
+ALTER TABLE subscription ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES subscription_stats (id) ON DELETE SET NULL;
+
+ALTER TABLE subscription ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id)
REFERENCES subscription_stats (id) ON DELETE SET NULL;
+
+ALTER TABLE subscription ADD CONSTRAINT session_id_exists FOREIGN KEY (session_id)
REFERENCES session (id) ON DELETE SET NULL;
+
+ALTER TABLE subscription ADD CONSTRAINT queue_id_exists FOREIGN KEY (queue_id) REFERENCES
queue (id) ON DELETE SET NULL;
+
+ALTER TABLE subscription_stats ADD CONSTRAINT subscription_id_exists FOREIGN KEY
(subscription_id) REFERENCES subscription (id) ON DELETE SET NULL;
+
ALTER TABLE sysimage ADD CONSTRAINT stats_curr_id_exists FOREIGN KEY (stats_curr_id)
REFERENCES sysimage_stats (id) ON DELETE SET NULL;
ALTER TABLE sysimage ADD CONSTRAINT stats_prev_id_exists FOREIGN KEY (stats_prev_id)
REFERENCES sysimage_stats (id) ON DELETE SET NULL;
Modified: mgmt/trunk/mint/sql/triggers.sql
===================================================================
--- mgmt/trunk/mint/sql/triggers.sql 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/mint/sql/triggers.sql 2010-01-06 19:22:06 UTC (rev 3759)
@@ -30,19 +30,6 @@
CREATE INDEX slot_stats_update_time ON slot_stats (qmf_update_time);
-CREATE OR REPLACE FUNCTION update_job_stats() RETURNS trigger AS '
-BEGIN
- UPDATE job SET stats_prev_id = stats_curr_id, stats_curr_id = new.id WHERE id =
new.job_id;
- RETURN new;
-END
-' LANGUAGE plpgsql;
-
-CREATE TRIGGER update_job_stats AFTER INSERT ON job_stats
- FOR EACH ROW EXECUTE PROCEDURE update_job_stats();
-
-CREATE INDEX job_stats_update_time ON job_stats (qmf_update_time);
-
-
CREATE OR REPLACE FUNCTION update_scheduler_stats() RETURNS trigger AS '
BEGIN
UPDATE scheduler SET stats_prev_id = stats_curr_id, stats_curr_id = new.id WHERE id =
new.scheduler_id;
@@ -277,6 +264,19 @@
CREATE INDEX binding_stats_update_time ON binding_stats (qmf_update_time);
+CREATE OR REPLACE FUNCTION update_subscription_stats() RETURNS trigger AS '
+BEGIN
+ UPDATE subscription SET stats_prev_id = stats_curr_id, stats_curr_id = new.id WHERE id
= new.subscription_id;
+ RETURN new;
+END
+' LANGUAGE plpgsql;
+
+CREATE TRIGGER update_subscription_stats AFTER INSERT ON subscription_stats
+ FOR EACH ROW EXECUTE PROCEDURE update_subscription_stats();
+
+CREATE INDEX subscription_stats_update_time ON subscription_stats (qmf_update_time);
+
+
CREATE OR REPLACE FUNCTION update_client_connection_stats() RETURNS trigger AS '
BEGIN
UPDATE client_connection SET stats_prev_id = stats_curr_id, stats_curr_id = new.id
WHERE id = new.client_connection_id;
Modified: mgmt/trunk/wooly/python/wooly/bench.py
===================================================================
--- mgmt/trunk/wooly/python/wooly/bench.py 2010-01-06 18:28:56 UTC (rev 3758)
+++ mgmt/trunk/wooly/python/wooly/bench.py 2010-01-06 19:22:06 UTC (rev 3759)
@@ -84,11 +84,11 @@
print "%i %s" % (count, url)
- start = time()
+ start = time.time()
html, profile = self.visit(url, referer, 0)
- end = time()
+ end = time.time()
bytes = len(html)
millis = (end - start) * 1000