[rhmessaging-commits] rhmessaging commits: r4042 - in mgmt/newdata/cumin: python/cumin and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jun 21 10:41:11 EDT 2010


Author: justi9
Date: 2010-06-21 10:41:11 -0400 (Mon, 21 Jun 2010)
New Revision: 4042

Modified:
   mgmt/newdata/cumin/bin/cumin-smoke-test
   mgmt/newdata/cumin/python/cumin/model.py
Log:
Restore job summary background update code; demonstrate its use in cumin-smoke-test

Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test	2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test	2010-06-21 14:41:11 UTC (rev 4042)
@@ -38,25 +38,31 @@
 
     cumin.start()
 
+    sleep(2)
+
     try:
         conn = cumin.database.get_connection()
         cursor = conn.cursor()
 
         # cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
-        cls = cumin.model.com_redhat_grid.Scheduler
+        cls = cumin.model.com_redhat_grid.Submission
         obj = cls.get_object(cursor)
 
-        print "Calling echo on", obj
+        print "Calling method on", obj
 
-        completed = Event()
+        summs = cumin.model.get_submission_job_summaries(obj)
+
+        pprint(summs)
+
+        #completed = Event()
         
-        def completion(x, y):
-            print x, y
-            completed.set()
+        #def completion(x, y):
+        #    print x, y
+        #    completed.set()
 
-        cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
+        #cumin.session.call_method(completion, obj, "GetJobSummaries", ())
 
-        completed.wait()
+        #completed.wait()
     finally:
         cumin.stop()
 

Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py	2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/python/cumin/model.py	2010-06-21 14:41:11 UTC (rev 4042)
@@ -29,8 +29,10 @@
         self.task_invocations = list()
 
         self.limits_by_negotiator = dict()
-        self.jobs_by_submission = dict()
+        self.job_summaries_by_submission = dict()
 
+        self.lock = Lock()
+
     def check(self):
         log.info("Checking %s", self)
 
@@ -84,22 +86,26 @@
         finally:
             self.lock.release()
 
-    def get_submission_jobs(self, submission):
+    def get_submission_job_summaries(self, submission):
         assert submission
 
         self.lock.acquire()
 
         try:
             try:
-                store = self.jobs_by_submission[submission]
+                store = self.job_summaries_by_submission[submission]
             except KeyError:
-                store = SubmissionJobStore(self, submission)
+                store = SubmissionJobSummaryStore(self, submission)
                 store.start_updates()
 
-                self.jobs_by_submission[submission] = store
+                self.job_summaries_by_submission[submission] = store
 
-                sleep(1)
+                for i in range(5):
+                    if store.data:
+                        break
 
+                    sleep(1)
+
             return store.data
         finally:
             self.lock.release()
@@ -1959,14 +1965,20 @@
             self.setDaemon(True)
 
         def run(self):
-            for i in range(20):
-                try:
-                    self.store.update()
-                except Exception, e:
-                    log.exception(e)
+            conn = self.store.model.app.database.get_connection()
+            cursor = conn.cursor()
 
-                sleep(30)
+            try:
+                for i in range(20):
+                    try:
+                        self.store.update(cursor)
+                    except Exception, e:
+                        log.exception(e)
 
+                    sleep(30)
+            finally:
+                conn.close()
+
             self.store.delete()
 
 class UpdateTimedOut(Exception):
@@ -1978,7 +1990,7 @@
 
         self.negotiator = negotiator
 
-    def update(self):
+    def update(self, cursor):
         def completion(status, data):
             self.data = data["Limits"]
 
@@ -1989,21 +2001,20 @@
 
         super(NegotiatorLimitStore, self).delete()
 
-class SubmissionJobStore(ObjectStore):
+class SubmissionJobSummaryStore(ObjectStore):
     def __init__(self, model, submission):
-        super(SubmissionJobStore, self).__init__(model)
+        super(SubmissionJobSummaryStore, self).__init__(model)
 
         self.submission = submission
 
-    def update(self):
+    def update(self, cursor):
         def completion(status, data):
             self.data = data["Jobs"]
 
-        scheduler = self.submission.scheduler
+        self.model.app.session.call_method \
+            (completion, self.submission, "GetJobSummaries", ())
 
-        scheduler.GetJobs(completion, self.submission.Name, None)
-
     def delete(self):
-        del self.model.jobs_by_submission[self.submission]
+        del self.model.job_summaries_by_submission[self.submission]
 
-        super(SubmissionJobStore, self).delete()
+        super(SubmissionJobSummaryStore, self).delete()



More information about the rhmessaging-commits mailing list