Author: eallen
Date: 2009-12-15 13:35:00 -0500 (Tue, 15 Dec 2009)
New Revision: 3744
Modified:
mgmt/trunk/cumin/python/cumin/grid/job.py
mgmt/trunk/cumin/python/cumin/grid/negotiator.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Added QmfCall and QmfCallSet classes for synchronous qmf calls.
Modified: mgmt/trunk/cumin/python/cumin/grid/job.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/job.py 2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/grid/job.py 2009-12-15 18:35:00 UTC (rev 3744)
@@ -11,7 +11,7 @@
from cumin.stat import *
from cumin.formats import *
from cumin.util import *
-
+from cumin.model import FetchJobAd, FetchJobOutput
import main
strings = StringCatalog(__file__)
@@ -429,12 +429,9 @@
scheduler = sched
break
- action = self.app.model.job.GetAd
- return action.qmfcall(job)
-
if scheduler:
- action = self.app.model.scheduler.GetAd
- return action.qmfcall(scheduler, job.id)
+ action = FetchJobAd(self.app)
+ return action.execute(scheduler, job.id).data['JobAd']
else:
return () # XXX
@@ -689,9 +686,9 @@
job = self.frame.get_object(session)
file, start, end = self.get_file_args(session)
if file:
- action = self.app.model.scheduler.Fetch
- data = action.qmfcall(scheduler, job, file, start, end)
- return escape_entity(data)
+ action = FetchJobOutput(self.app)
+ result = action.execute(scheduler, job.id, file, start, end)
+ return escape_entity(result.data['Data'])
return self.err_msg
def get_file_args(self, session):
Modified: mgmt/trunk/cumin/python/cumin/grid/negotiator.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/negotiator.py 2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/grid/negotiator.py 2009-12-15 18:35:00 UTC (rev 3744)
@@ -11,6 +11,7 @@
from cumin.util import *
from submitter import SubmitterSet
+from cumin.model import FetchRawConfig, FetchRawConfigSet
import main
strings = StringCatalog(__file__)
@@ -314,8 +315,9 @@
groups = self.groups.get(session)
if len(groups) == 0:
negotiator = self.negotiator.get(session)
- action = self.app.model.negotiator.GetRawConfig
- groups = action.do_invoke(negotiator, "GROUP_NAMES", 10)
+ action = FetchRawConfig(self.app)
+ results = action.execute(negotiator, "GROUP_NAMES", timeout=10)
+ groups = results.data
try:
groups = self.split_group_names(groups['Value'])
except Exception, e:
@@ -342,11 +344,14 @@
if groups is None:
groups = self.get_group_names(session)
negotiator = self.negotiator.get(session)
- action = self.app.model.negotiator.GetConfigSet
+ action = FetchRawConfigSet(self.app)
+ raw_configs = action.execute(negotiator, groups, config, timeout=15)
- raw_configs = action.do_invoke(negotiator, groups, config,
"GetRawConfig", timeout=15)
- for [group, data, status] in raw_configs:
- configs.append([group, data['Value'], status])
+ for group in sorted(raw_configs):
+ res = raw_configs[group]
+ configs.append([group, res.data['Value'],
+ (res.status, res.error, res.got_data)])
+
param.set(session, configs)
return configs
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-12-15 18:35:00 UTC (rev 3744)
@@ -2068,9 +2068,6 @@
prop = CuminProperty(self, "HoldReason")
prop.title = "Hold Reason"
- action = self.GetAd(self, "GetAd")
- action.navigable = False
-
def init(self):
super(CuminJob, self).init()
@@ -2079,7 +2076,6 @@
def get_title(self, session):
return "Job"
- def get_object_name(self, job):
if isinstance(job, basestring):
return job
else:
@@ -2092,16 +2088,6 @@
def render_status(self, session, status):
return JobStatusInfo.get_status_string(status)
- class GetAd(CuminAction):
- def qmfcall(self, job):
-
- default = {'JobAd': {"":{"VALUE":"",
"TYPE":0}}}
- qmf = QMFCaller(default)
- job.GetAd(self.mint.model, qmf.get_completion(), None)
- wait(qmf.done, timeout=10)
-
- return qmf.data['JobAd']
-
class GetStartedAction(CuminAction):
def get_xml_response(self, session, object, *args):
updateTime = object.statsCurr and object.statsCurr.qmfUpdateTime \
@@ -2112,13 +2098,13 @@
return "%s%s" % (conf, rect)
-# XXX no, not this way
-class QMFCaller(object):
- def __init__(self, default):
+class QmfCall(object):
+ def __init__(self, app, default=None):
self.data = default
self.got_data = False
self.error = False
self.status = None
+ self.model = app.model.mint.model
def get_completion(self):
def completion(status, data):
@@ -2131,9 +2117,70 @@
return completion
+ def do_wait(self, timeout=5):
+ wait(self.done, timeout=timeout)
+ return self
+
def done(self):
return self.got_data or self.error
+class QmfCallSet(object):
+ def __init__(self, app):
+ self.app = app
+ self.calls = dict()
+
+ def add_call(self, key, default=None):
+ call = QmfCall(self.app, default)
+ self.calls[key] = call
+ return call
+
+ def do_wait(self, timeout=5):
+ def predicate(calls):
+ done = 0
+ for call in calls:
+ if calls[call].done():
+ done += 1
+ return done == len(calls)
+
+ wait(predicate, timeout=timeout, args=self.calls)
+ return self.calls
+
+class FetchRawConfig(QmfCall):
+ def __init_(self, app):
+ super(FetchRawConfig, self).__init__(app)
+ self.data = {'Value': None}
+
+ def execute(self, negotiator, config_name, timeout=5):
+ negotiator.GetRawConfig(self.model, self.get_completion(), config_name, None)
+ return self.do_wait(timeout)
+
+class FetchRawConfigSet(QmfCallSet):
+ def execute(self, negotiator, groups, prepend="", timeout=5):
+ default = {'Value': 0}
+ for group in groups:
+ call = self.add_call(group, default)
+ negotiator.GetRawConfig(call.model, call.get_completion(), prepend+group,
None)
+
+ return self.do_wait(timeout)
+
+class FetchJobAd(QmfCall):
+ def __init__(self, app):
+ super(FetchJobAd, self).__init__(app)
+ self.data = {'JobAd': {"":{"VALUE":"",
"TYPE":0}}}
+
+ def execute(self, scheduler, jobId, timeout=10):
+ scheduler.GetAd(self.model, self.get_completion(), jobId, None)
+ return self.do_wait(timeout)
+
+class FetchJobOutput(QmfCall):
+ def __init__(self, app):
+ super(FetchJobOutput, self).__init__(app)
+ self.data = {'Data': ""}
+
+ def execute(self, scheduler, jobId, file, start, end, timeout=10):
+ scheduler.Fetch(self.model, self.get_completion(), jobId, file, start, end,
None)
+ return self.do_wait(timeout)
+
class CuminScheduler(RemoteClass):
def __init__(self, model):
super(CuminScheduler, self).__init__(model, "scheduler",
@@ -2182,12 +2229,6 @@
action = GetStartedAction(self, "GetStarted")
action.navigable = False
- action = self.GetAd(self, "GetAd")
- action.navigable = False
-
- action = self.Fetch(self, "Fetch")
- action.navigable = False
-
def init(self):
super(CuminScheduler, self).init()
@@ -2199,25 +2240,6 @@
def get_object_name(self, sched):
return sched.Name
- class Fetch(CuminAction):
- def qmfcall(self, scheduler, job, file, start, end):
- default = {'Data': ""}
- qmf = QMFCaller(default)
- scheduler.Fetch(self.mint.model, qmf.get_completion(), job.id, file, start,
end, None)
- wait(qmf.done, timeout=10)
-
- return qmf.data['Data']
-
- class GetAd(CuminAction):
- def qmfcall(self, scheduler, job):
-
- default = {'JobAd': {"":{"VALUE":"",
"TYPE":0}}}
- qmf = QMFCaller(default)
- scheduler.GetAd(self.mint.model, qmf.get_completion(), job, None)
- wait(qmf.done, timeout=10)
-
- return qmf.data['JobAd']
-
class CuminSubmission(RemoteClass):
def __init__(self, model):
super(CuminSubmission, self).__init__(model, "submission",
@@ -2367,12 +2389,6 @@
action = GetStartedAction(self, "GetStarted")
action.navigable = False
- action = self.GetConfigSet(self, "GetConfigSet")
- action.navigable = False
-
- action = self.GetRawConfig(self, "GetRawConfig")
- action.navigable = False
-
def init(self):
super(CuminNegotiator, self).init()
@@ -2384,40 +2400,6 @@
def get_object_name(self, neg):
return neg.Name
- class GetRawConfig(CuminAction):
- def do_invoke(self, negotiator, config_name, timeout=5):
- default = {'Value': None}
- qmf = QMFCaller(default)
- negotiator.GetRawConfig(self.mint.model, qmf.get_completion(), config_name,
None)
- wait(qmf.done, timeout=timeout)
- return qmf.data
-
- class GetConfigSet(CuminAction):
- """ send a set of qmf requests at the same time and wait for
- them all to complete or timeout """
- def do_invoke(self, negotiator, groups, prepend="",
method="GetStats", timeout=5):
- def predicate(calls):
- done = 0
- for call in calls:
- if call.done():
- done = done + 1
- return done == len(calls)
-
- calls = list()
- default = {'Value': 0}
- for group in groups:
- call = QMFCaller(default)
- calls.append(call)
- if method == "GetStats":
- negotiator.GetStats(self.mint.model, call.get_completion(), group,
None)
- elif method == "GetRawConfig":
- negotiator.GetRawConfig(self.mint.model, call.get_completion(),
prepend+group, None)
-
- wait(predicate, timeout=timeout, args=calls)
-
- return [(group, call.data, (call.status, call.error, call.got_data))
- for call, group in zip(calls, groups)]
-
class CuminSubject(CuminClass):
def __init__(self, model):
super(CuminSubject, self).__init__(model, "subject", Subject)
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2009-12-15 18:35:00 UTC (rev 3744)
@@ -94,16 +94,10 @@
class JobParameter(Parameter):
def do_unmarshal(self, id):
- try:
- return Job.select("custom_id='%s'" % id)[0]
- except:
- return Job.get(id)
+ return Job.get(int(id))
def do_marshal(self, job):
- try:
- return str(job.id)
- except:
- return job.CustomId
+ return str(job.id)
class JobGroupParameter(Parameter):
def do_unmarshal(self, string):