[rhmessaging-commits] rhmessaging commits: r4340 - in mgmt/newdata/cumin/python/cumin: grid and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Sep 24 11:31:21 EDT 2010


Author: eallen
Date: 2010-09-24 11:31:20 -0400 (Fri, 24 Sep 2010)
New Revision: 4340

Modified:
   mgmt/newdata/cumin/python/cumin/grid/negotiator.py
   mgmt/newdata/cumin/python/cumin/grid/negotiator.strings
   mgmt/newdata/cumin/python/cumin/model.py
Log:
Fix BZ 636618: Periodically update the quotas cache
Also Handle exceptions better.
- Missing Negotiator
- Agent not found
- Request timeout 

Modified: mgmt/newdata/cumin/python/cumin/grid/negotiator.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/negotiator.py	2010-09-23 21:52:14 UTC (rev 4339)
+++ mgmt/newdata/cumin/python/cumin/grid/negotiator.py	2010-09-24 15:31:20 UTC (rev 4340)
@@ -86,8 +86,8 @@
     def __init__(self, app, name, negotiator, frame):
         super(NegotiatorOverview, self).__init__(app, name)
 
-        self.update_enabled = False
-        self.defer_enabled = True
+        self.update_enabled = True
+        #self.defer_enabled = True
 
         self.group_helper = GroupHelper(app, "groups", negotiator)
         self.add_child(self.group_helper)
@@ -112,6 +112,9 @@
         col = self.SpacerColumn(app, "spacer")
         self.add_column(col)
 
+        self.error_tmpl = WidgetTemplate(self, "error_html")
+        self.loading_tmpl = WidgetTemplate(self, "deferred_html")
+
     def render_title(self, session):
         return "Quotas"
 
@@ -121,6 +124,33 @@
     def render_deferred_content(self, session):
         return "Loading..."
 
+    def do_render(self, session):
+        tmpl = self.error_tmpl
+        msg = None
+
+        try:
+            info = self.group_helper.get_config_info(session)
+        except AssertionError:
+            msg = "Missing negotiator"
+        except Exception, e:
+            msg = e.message
+            if msg == "Loading":
+                tmpl = self.loading_tmpl
+            else:
+                log.exception(e)
+        else:
+            return super(NegotiatorOverview, self).do_render(session)
+
+        writer = Writer()
+        if "is unknown" in msg:
+            msg = "Please try again later"
+        err_msg = "The data is not available at this time. <br/>%s" % msg
+        tmpl.render(writer, session, err_msg)
+        return writer.to_string()
+
+    def render_error_msg(self, session, msg):
+        return msg
+
     def do_get_items(self, session):
         info = self.group_helper.get_config_info(session)
 
@@ -215,9 +245,12 @@
         def render_content(self, session, group):
             value = self.parent.group_helper.get_config_value(session, group, "GROUP_QUOTA_DYNAMIC")
             self.task.form.group_leader.set(session, group)
-            href = self.task.get_href(session)
-            content = "%s%%" % str(round(float(value) * 100.0, 2))
-            return fmt_link(href, content, "", "", self.fmt_hover(""))
+            if not "loading" in value:
+                href = self.task.get_href(session)
+                content = "%s%%" % str(round(float(value) * 100.0, 2))
+                return fmt_link(href, content, "", "", self.fmt_hover(""))
+            else:
+                return value
 
     class SpacerColumn(ItemTableColumn):
         def render_title(self, session, *args):
@@ -230,68 +263,44 @@
     def __init__(self, app, name, negotiator):
         super(GroupHelper, self).__init__(app, name)
 
-        self.groups = self.GroupAttribute(app, "groups")
-        self.add_attribute(self.groups)
-
-        self.group_dyn_quotas = self.GroupAttribute(app, "group_dynamic_quotas")
-        self.add_attribute(self.group_dyn_quotas)
-
-        self.group_static_quotas = self.GroupAttribute(app, "group_static_quotas")
-        self.add_attribute(self.group_static_quotas)
-
-        self.group_autoregroups = self.GroupAttribute(app, "group_autoregroups")
-        self.add_attribute(self.group_autoregroups)
-
-        self.user_autoregroups = self.GroupAttribute(app, "user_autoregroups")
-        self.add_attribute(self.user_autoregroups)
-
-        self.autoregroup = self.GroupAttribute(app, "autoregroup")
-        self.add_attribute(self.autoregroup)
-
-        self.group_factors = self.GroupAttribute(app, "group_factors")
-        self.add_attribute(self.group_factors)
-
-        self.user_factors = self.GroupAttribute(app, "user_factors")
-        self.add_attribute(self.user_factors)
-
         self.negotiator = negotiator
 
-        self.users = self.Users(app, "users")
-        self.add_attribute(self.users)
+        self.info = Attribute(app, "info")
+        self.add_attribute(self.info)
 
     def get_config_info(self, session):
         negotiator = self.negotiator.get(session)
-        try:
-            info = self.app.model.configs_by_negotiator \
-            [negotiator._qmf_agent_id]
-        except KeyError:
-            info = dict()
 
-        if len(info) == 0:
-            default =  {'Value': None}
-            action = QmfCall(self.app, default=default, timeout=10)
-            results = action.execute(negotiator, "GetRawConfig", "GROUP_NAMES")
+        info = self.info.get(session)
+
+        if not info:
+            results = self.app.model.get_negotiator_group_names(negotiator)
             groups = results.data
-            try:
-                groups = self.split_group_names(groups['Value'])
-            except Exception, e:
-                if not groups:
-                    groups = ""
-                msg = "Unable to parse groups %s %s" % (groups, e.message)
-                log.debug(msg)
-                groups = []
 
-            info = dict()
-            for group in groups:
-                info[group] = dict()
+            if not groups:
+                if (not results.status) and (not results.exception):
+                    raise QmfException("Loading")
+                    #results.exception = QmfException("Request timed out. Please refresh the page to try again.")
+            if results.exception:
+                raise results.exception
+            else:
+                try:
+                    groups = self.split_group_names(groups)
+                except Exception, e:
+                    log.exception(e)
+                    groups = []
 
-            self.app.model.configs_by_negotiator[negotiator._qmf_agent_id] = info
+                info = dict()
+                for group in groups:
+                    info[group] = dict()
 
+            self.info.set(session, info)
+
         return info
 
     def has_child(self, session, group):
         negotiator = self.negotiator.get(session)
-        info = self.app.model.configs_by_negotiator[negotiator._qmf_agent_id]
+        info = self.get_config_info(session)
 
         try:
             return info[group]['has_child']
@@ -302,12 +311,12 @@
                     info[group]['has_child'] = True
                     break
 
-        self.app.model.configs_by_negotiator[negotiator._qmf_agent_id] = info
+        self.info.set(session, info)
         return info[group]['has_child']
 
     def get_parent(self, session, group):
         negotiator = self.negotiator.get(session)
-        info = self.app.model.configs_by_negotiator[negotiator._qmf_agent_id]
+        info = self.get_config_info(session)
 
         try:
             return info[group]['parent']
@@ -320,12 +329,12 @@
 
             info[group]['parent'] = parent
 
-        self.app.model.configs_by_negotiator[negotiator._qmf_agent_id] = info
+        self.info.set(session, info)
         return info[group]['parent']
 
     def get_siblings(self, session, node):
         negotiator = self.negotiator.get(session)
-        info = self.app.model.configs_by_negotiator[negotiator._qmf_agent_id]
+        info = self.get_config_info(session)
 
         siblings = list()
         (ng, s, nn) = rpartition(node, ".")
@@ -352,63 +361,31 @@
 
         if len(needed_groups) > 0:
             negotiator = self.negotiator.get(session)
-            action = FetchRawConfigSet(self.app)
-            raw_configs = action.execute(negotiator, needed_groups, config+"_")
 
-            for group in raw_configs:
-                res = raw_configs[group]
-                info[group][config] = res.data['Value']
+            results = self.app.model.get_negotiator_config_values(negotiator, needed_groups, config)
+            raw_configs = results.data
 
+            try:
+                for config in raw_configs:
+                    for group in raw_configs[config]:
+                        info[group][config] = raw_configs[config][group].data['Value']
+            except Exception, e:
+                log.exception(e)
+
+            self.info.set(session, info)
+        return info
+
     def get_config_value(self, session, group, config):
         info = self.get_config_info(session)
-        return info[group][config]
+        try:
+            return info[group][config]
+        except KeyError:
+            try:
+                info = self.get_config_for_groups(session, config, [group])
+                return info[group][config]
+            except:
+                return "loading"
 
-    def get_group_raw_config(self, session, config, param, groups=None):
-        configs = param.get(session)
-
-        if len(configs) == 0:
-            if groups is None:
-                groups = self.get_group_names(session)
-            negotiator = self.negotiator.get(session)
-            action = FetchRawConfigSet(self.app)
-            raw_configs = action.execute(negotiator, groups, config)
-
-            for group in sorted(raw_configs):
-                res = raw_configs[group]
-                configs.append([group, res.data['Value'], 
-                                (res.status, res.error, res.got_data)])
-
-            param.set(session, configs)
-
-        return configs
-
-    def get_static_quota(self, session):
-        return self.get_group_raw_config(session, 
-                                         "GROUP_QUOTA_", 
-                                         self.group_static_quotas)
-
-    def get_dyn_quota(self, session):
-        return self.get_group_raw_config(session, 
-                                         "GROUP_QUOTA_DYNAMIC_", 
-                                         self.group_dyn_quotas)
-
-    def get_priority_factor(self, session, groups=None, user=False):
-        param = user and self.user_factors or self.group_factors
-        return self.get_group_raw_config(session, 
-                                         "GROUP_PRIO_FACTOR_", 
-                                         param, groups)
-
-    def get_regroups(self, session, groups=None, user=False):
-        param = user and self.user_autoregroups or self.group_autoregroups
-        return self.get_group_raw_config(session, 
-                                         "GROUP_AUTOREGROUP_", 
-                                         param, groups)
-
-    def get_autoregroup(self, session):
-        return self.get_group_raw_config(session, 
-                                         "GROUP_AUTOREGROUP", 
-                                         self.autoregroup, [""])
-
     def get_unclaimed_dyn_quota(self, session, groups):
         info = self.get_config_info(session)
         total = 0.0
@@ -424,14 +401,6 @@
         val = min(1.0, val)
         return val
 
-    class GroupAttribute(Attribute):
-        def get_default(self, session):
-            return list()
-
-    class Users(Attribute):
-        def get_default(self, session):
-            return dict()
-
 class GroupAddForm(ObjectFrameTaskForm):
     def __init__(self, app, name, task):
         super(GroupAddForm, self).__init__(app, name, task)
@@ -689,6 +658,8 @@
 
         if changed:
             self.task.reconfig(negotiator)
+            self.app.model.update_negotiator_config_value(negotiator)
+
         self.task.exit_with_redirect(session)
 
     def check_quota(self, quota, original):
@@ -937,6 +908,12 @@
             raise result.error
 
         invoc.status_code = invoc.status
+        try:
+            group = group.split("_")[-1]
+            invoc.description = "Set %s to %s" % (group, value)
+        except:
+            pass
+
         invoc.end()
 
     def reconfig(self, negotiator):

Modified: mgmt/newdata/cumin/python/cumin/grid/negotiator.strings
===================================================================
--- mgmt/newdata/cumin/python/cumin/grid/negotiator.strings	2010-09-23 21:52:14 UTC (rev 4339)
+++ mgmt/newdata/cumin/python/cumin/grid/negotiator.strings	2010-09-24 15:31:20 UTC (rev 4340)
@@ -41,6 +41,29 @@
   </table>
 </div>
 
+[NegotiatorOverview.deferred_html]
+<div id="{id}">
+    <div class="deferredSpacer">Loading...</div>
+</div>
+<script type="text/javascript">
+<![CDATA[
+(function() {
+    window.addEvent("domready", function () { 
+        setTimeout( function () { window.location.reload(); }, 2000 );
+    });
+}())
+]]>
+</script>
+
+[NegotiatorOverview.error_html]
+<div id="{id}">
+    <div class="TaskInvocationSet">
+        <ul>
+            <li>{error_msg}</li>
+        </ul>
+    </div>
+</div>
+
 [GroupForm.css]
 div.deferredSpacer {
     height: 10em;

Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py	2010-09-23 21:52:14 UTC (rev 4339)
+++ mgmt/newdata/cumin/python/cumin/model.py	2010-09-24 15:31:20 UTC (rev 4340)
@@ -30,7 +30,8 @@
 
         self.limits_by_negotiator = dict()
         self.job_summaries_by_submission = dict()
-        self.configs_by_negotiator = dict()
+        self.group_names_by_negotiator = dict()
+        self.group_config_values_by_negotiator = dict()
 
         self.lock = Lock()
 
@@ -73,6 +74,63 @@
 
         return session
 
+    def get_negotiator_group_names(self, negotiator):
+        assert negotiator
+
+        self.lock.acquire()
+
+        try:
+            try:
+                store = self.group_names_by_negotiator[negotiator._qmf_agent_id]
+            except KeyError:
+                store = NegotiatorGroupNamesStore(self, negotiator)
+                store.start_updates()
+
+                self.group_names_by_negotiator[negotiator._qmf_agent_id] = store
+                #sleep(1)
+
+            return store
+        finally:
+            self.lock.release()
+
+    def get_negotiator_config_values(self, negotiator, needed_groups, config):
+        assert negotiator
+
+        self.lock.acquire()
+
+        try:
+            try:
+                store = self.group_config_values_by_negotiator[negotiator._qmf_agent_id]
+                added = 0
+                for group in needed_groups:
+                    added += store.add_group_config(group, config)
+                if added > 0:
+                    store.update(None)
+                    sleep(1)
+            except KeyError:
+                store = NegotiatorGroupConfigValuesStore(self, negotiator, needed_groups, config)
+                store.start_updates()
+
+                self.group_config_values_by_negotiator[negotiator._qmf_agent_id] = store
+
+                sleep(1)
+
+            return store
+        finally:
+            self.lock.release()
+
+    def update_negotiator_config_value(self, negotiator):
+        assert negotiator
+
+        self.lock.acquire()
+
+        try:
+            store = self.group_config_values_by_negotiator[negotiator._qmf_agent_id]
+            store.update(None)
+            sleep(1)
+        finally:
+            self.lock.release()
+
     def get_negotiator_limits(self, negotiator):
         assert negotiator
 
@@ -1720,6 +1778,7 @@
             session.call_method(self.get_completion(), job_server, "Fetch", (jobId, file, start, end))
         except Exception, e:
             self.error = e
+            log.exception(e)
         return self.do_wait()
 
 class CuminScheduler(RemoteClass):
@@ -2054,3 +2113,60 @@
         del self.model.job_summaries_by_submission[self.submission._id]
 
         super(SubmissionJobSummaryStore, self).delete()
+
+class NegotiatorGroupNamesStore(ObjectStore):
+    def __init__(self, model, negotiator):
+        super(NegotiatorGroupNamesStore, self).__init__(model)
+
+        self.negotiator = negotiator
+
+    def update(self, cursor):
+        def completion(status, data):
+            self.status = status
+            try:
+                self.data = data["Value"]
+            except KeyError:
+                pass
+
+        self.model.app.session.call_method \
+            (completion, self.negotiator, "GetRawConfig", ("GROUP_NAMES",))
+
+    def delete(self):
+        del self.model.group_names_by_negotiator[self.negotiator._qmf_agent_id]
+
+        super(NegotiatorGroupNamesStore, self).delete()
+
+class NegotiatorGroupConfigValuesStore(ObjectStore):
+    def __init__(self, model, negotiator, groups, config):
+        super(NegotiatorGroupConfigValuesStore, self).__init__(model)
+
+        self.negotiator = negotiator
+        self.configs = dict()
+
+        self.configs[config] = list(groups)
+        self.data = dict()
+
+    def add_group_config(self, group, config):
+        added = 0
+
+        try:
+            groups = self.configs[config]
+            if not group in groups:
+                groups.append(group)
+                added = 1
+        except KeyError:
+            self.configs[config] = [group]
+            added = 1
+
+        return added
+
+    def update(self, cursor):
+        for config in self.configs:
+            action = FetchRawConfigSet(self.model.app)
+            raw_configs = action.execute(self.negotiator, self.configs[config], config+"_")
+            self.data[config] = raw_configs
+
+    def delete(self):
+        del self.model.group_config_values_by_negotiator[self.negotiator._qmf_agent_id]
+
+        super(NegotiatorGroupConfigValuesStore, self).delete()



More information about the rhmessaging-commits mailing list