[rhmessaging-commits] rhmessaging commits: r3744 - in mgmt/trunk/cumin/python/cumin: grid and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue Dec 15 13:35:03 EST 2009


Author: eallen
Date: 2009-12-15 13:35:00 -0500 (Tue, 15 Dec 2009)
New Revision: 3744

Modified:
   mgmt/trunk/cumin/python/cumin/grid/job.py
   mgmt/trunk/cumin/python/cumin/grid/negotiator.py
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Added QmfCall and QmfCallSet classes for synchronous qmf calls.

Modified: mgmt/trunk/cumin/python/cumin/grid/job.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/job.py	2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/grid/job.py	2009-12-15 18:35:00 UTC (rev 3744)
@@ -11,7 +11,7 @@
 from cumin.stat import *
 from cumin.formats import *
 from cumin.util import *
-
+from cumin.model import FetchJobAd, FetchJobOutput
 import main
 
 strings = StringCatalog(__file__)
@@ -429,12 +429,9 @@
                         scheduler = sched
                         break
 
-        action = self.app.model.job.GetAd
-        return action.qmfcall(job)
-
         if scheduler:
-            action = self.app.model.scheduler.GetAd
-            return action.qmfcall(scheduler, job.id)
+            action = FetchJobAd(self.app)
+            return action.execute(scheduler, job.id).data['JobAd']
         else:
             return () # XXX
 
@@ -689,9 +686,9 @@
         job = self.frame.get_object(session)
         file, start, end = self.get_file_args(session)
         if file:
-            action = self.app.model.scheduler.Fetch
-            data = action.qmfcall(scheduler, job, file, start, end)
-            return escape_entity(data)
+            action = FetchJobOutput(self.app)
+            result = action.execute(scheduler, job.id, file, start, end)
+            return escape_entity(result.data['Data'])
         return self.err_msg
 
     def get_file_args(self, session):

Modified: mgmt/trunk/cumin/python/cumin/grid/negotiator.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/negotiator.py	2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/grid/negotiator.py	2009-12-15 18:35:00 UTC (rev 3744)
@@ -11,6 +11,7 @@
 from cumin.util import *
 
 from submitter import SubmitterSet
+from cumin.model import FetchRawConfig, FetchRawConfigSet
 import main
 
 strings = StringCatalog(__file__)
@@ -314,8 +315,9 @@
         groups = self.groups.get(session)
         if len(groups) == 0:
             negotiator = self.negotiator.get(session)
-            action = self.app.model.negotiator.GetRawConfig
-            groups = action.do_invoke(negotiator, "GROUP_NAMES", 10)
+            action = FetchRawConfig(self.app)
+            results = action.execute(negotiator, "GROUP_NAMES", timeout=10)
+            groups = results.data
             try:
                 groups = self.split_group_names(groups['Value'])
             except Exception, e:
@@ -342,11 +344,14 @@
             if groups is None:
                 groups = self.get_group_names(session)
             negotiator = self.negotiator.get(session)
-            action = self.app.model.negotiator.GetConfigSet
+            action = FetchRawConfigSet(self.app)
+            raw_configs = action.execute(negotiator, groups, config, timeout=15)
 
-            raw_configs = action.do_invoke(negotiator, groups, config, "GetRawConfig", timeout=15)
-            for [group, data, status] in raw_configs:
-                configs.append([group, data['Value'], status])
+            for group in sorted(raw_configs):
+                res = raw_configs[group]
+                configs.append([group, res.data['Value'], 
+                                (res.status, res.error, res.got_data)])
+
             param.set(session, configs)
 
         return configs

Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/model.py	2009-12-15 18:35:00 UTC (rev 3744)
@@ -2068,9 +2068,6 @@
         prop = CuminProperty(self, "HoldReason")
         prop.title = "Hold Reason"
 
-        action = self.GetAd(self, "GetAd")
-        action.navigable = False
-
     def init(self):
         super(CuminJob, self).init()
 
@@ -2079,7 +2076,6 @@
     def get_title(self, session):
         return "Job"
 
-    def get_object_name(self, job):
         if isinstance(job, basestring):
             return job
         else:
@@ -2092,16 +2088,6 @@
         def render_status(self, session, status):
             return JobStatusInfo.get_status_string(status)
 
-    class GetAd(CuminAction):
-        def qmfcall(self, job):
-
-            default = {'JobAd': {"":{"VALUE":"", "TYPE":0}}}
-            qmf = QMFCaller(default)
-            job.GetAd(self.mint.model, qmf.get_completion(), None)
-            wait(qmf.done, timeout=10)
-
-            return qmf.data['JobAd']
-
 class GetStartedAction(CuminAction):
     def get_xml_response(self, session, object, *args):
         updateTime = object.statsCurr and object.statsCurr.qmfUpdateTime \
@@ -2112,13 +2098,13 @@
 
         return "%s%s" % (conf, rect)
 
-# XXX no, not this way
-class QMFCaller(object):
-    def __init__(self, default):
+class QmfCall(object):
+    def __init__(self, app, default=None):
         self.data = default
         self.got_data = False
         self.error = False
         self.status = None
+        self.model = app.model.mint.model
 
     def get_completion(self):
         def completion(status, data):
@@ -2131,9 +2117,70 @@
 
         return completion
 
+    def do_wait(self, timeout=5):
+        wait(self.done, timeout=timeout)
+        return self
+
     def done(self):
         return self.got_data or self.error
 
+class QmfCallSet(object):
+    def __init__(self, app):
+        self.app = app
+        self.calls = dict()
+
+    def add_call(self, key, default=None):
+        call = QmfCall(self.app, default)
+        self.calls[key] = call
+        return call
+
+    def do_wait(self, timeout=5):
+        def predicate(calls):
+            done = 0
+            for call in calls:
+                if calls[call].done():
+                    done += 1
+            return done == len(calls)
+
+        wait(predicate, timeout=timeout, args=self.calls)
+        return self.calls
+
+class FetchRawConfig(QmfCall):
+    def __init_(self, app):
+        super(FetchRawConfig, self).__init__(app)
+        self.data = {'Value': None}
+
+    def execute(self, negotiator, config_name, timeout=5):
+        negotiator.GetRawConfig(self.model, self.get_completion(), config_name, None)
+        return self.do_wait(timeout)
+
+class FetchRawConfigSet(QmfCallSet):
+    def execute(self, negotiator, groups, prepend="", timeout=5):
+        default = {'Value': 0}
+        for group in groups:
+            call = self.add_call(group, default)
+            negotiator.GetRawConfig(call.model, call.get_completion(), prepend+group, None)
+
+        return self.do_wait(timeout)
+
+class FetchJobAd(QmfCall):
+    def __init__(self, app):
+        super(FetchJobAd, self).__init__(app)
+        self.data = {'JobAd': {"":{"VALUE":"", "TYPE":0}}}
+
+    def execute(self, scheduler, jobId, timeout=10):
+        scheduler.GetAd(self.model, self.get_completion(), jobId, None)
+        return self.do_wait(timeout)
+
+class FetchJobOutput(QmfCall):
+    def __init__(self, app):
+        super(FetchJobOutput, self).__init__(app)
+        self.data = {'Data': ""}
+
+    def execute(self, scheduler, jobId, file, start, end, timeout=10):
+        scheduler.Fetch(self.model, self.get_completion(), jobId, file, start, end, None)
+        return self.do_wait(timeout)
+
 class CuminScheduler(RemoteClass):
     def __init__(self, model):
         super(CuminScheduler, self).__init__(model, "scheduler",
@@ -2182,12 +2229,6 @@
         action = GetStartedAction(self, "GetStarted")
         action.navigable = False
 
-        action = self.GetAd(self, "GetAd")
-        action.navigable = False
-
-        action = self.Fetch(self, "Fetch")
-        action.navigable = False
-
     def init(self):
         super(CuminScheduler, self).init()
 
@@ -2199,25 +2240,6 @@
     def get_object_name(self, sched):
         return sched.Name
 
-    class Fetch(CuminAction):
-        def qmfcall(self, scheduler, job, file, start, end):
-            default = {'Data': ""}
-            qmf = QMFCaller(default)
-            scheduler.Fetch(self.mint.model, qmf.get_completion(), job.id, file, start, end, None)
-            wait(qmf.done, timeout=10)
-
-            return qmf.data['Data']
-
-    class GetAd(CuminAction):
-        def qmfcall(self, scheduler, job):
-
-            default = {'JobAd': {"":{"VALUE":"", "TYPE":0}}}
-            qmf = QMFCaller(default)
-            scheduler.GetAd(self.mint.model, qmf.get_completion(), job, None)
-            wait(qmf.done, timeout=10)
-
-            return qmf.data['JobAd']
-
 class CuminSubmission(RemoteClass):
     def __init__(self, model):
         super(CuminSubmission, self).__init__(model, "submission",
@@ -2367,12 +2389,6 @@
         action = GetStartedAction(self, "GetStarted")
         action.navigable = False
 
-        action = self.GetConfigSet(self, "GetConfigSet")
-        action.navigable = False
-
-        action = self.GetRawConfig(self, "GetRawConfig")
-        action.navigable = False
-
     def init(self):
         super(CuminNegotiator, self).init()
 
@@ -2384,40 +2400,6 @@
     def get_object_name(self, neg):
         return neg.Name
 
-    class GetRawConfig(CuminAction):
-        def do_invoke(self, negotiator, config_name, timeout=5):
-            default = {'Value': None}
-            qmf = QMFCaller(default)
-            negotiator.GetRawConfig(self.mint.model, qmf.get_completion(), config_name, None)
-            wait(qmf.done, timeout=timeout)
-            return qmf.data
-
-    class GetConfigSet(CuminAction):
-        """ send a set of qmf requests at the same time and wait for
-        them all to complete or timeout """
-        def do_invoke(self, negotiator, groups, prepend="", method="GetStats", timeout=5):
-            def predicate(calls):
-                done = 0
-                for call in calls:
-                    if call.done():
-                        done = done + 1
-                return done == len(calls)
-
-            calls = list()
-            default = {'Value': 0}
-            for group in groups:
-                call = QMFCaller(default)
-                calls.append(call)
-                if method == "GetStats":
-                    negotiator.GetStats(self.mint.model, call.get_completion(), group, None)
-                elif method == "GetRawConfig":
-                    negotiator.GetRawConfig(self.mint.model, call.get_completion(), prepend+group, None)
-
-            wait(predicate, timeout=timeout, args=calls)
-
-            return  [(group, call.data, (call.status, call.error, call.got_data))
-                       for call, group in zip(calls, groups)]
-
 class CuminSubject(CuminClass):
     def __init__(self, model):
         super(CuminSubject, self).__init__(model, "subject", Subject)

Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py	2009-12-14 20:36:08 UTC (rev 3743)
+++ mgmt/trunk/cumin/python/cumin/parameters.py	2009-12-15 18:35:00 UTC (rev 3744)
@@ -94,16 +94,10 @@
 
 class JobParameter(Parameter):
     def do_unmarshal(self, id):
-        try:
-            return Job.select("custom_id='%s'" % id)[0]
-        except:
-            return Job.get(id)
+        return Job.get(int(id))
 
     def do_marshal(self, job):
-        try:
-            return str(job.id)
-        except:
-            return job.CustomId
+        return str(job.id)
 
 class JobGroupParameter(Parameter):
     def do_unmarshal(self, string):



More information about the rhmessaging-commits mailing list