[rhmessaging-commits] rhmessaging commits: r4042 - in mgmt/newdata/cumin: python/cumin and 1 other directory.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jun 21 10:41:11 EDT 2010
Author: justi9
Date: 2010-06-21 10:41:11 -0400 (Mon, 21 Jun 2010)
New Revision: 4042
Modified:
mgmt/newdata/cumin/bin/cumin-smoke-test
mgmt/newdata/cumin/python/cumin/model.py
Log:
Restore job summary background update code; demonstrate its use in cumin-smoke-test
Modified: mgmt/newdata/cumin/bin/cumin-smoke-test
===================================================================
--- mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/bin/cumin-smoke-test 2010-06-21 14:41:11 UTC (rev 4042)
@@ -38,25 +38,31 @@
cumin.start()
+ sleep(2)
+
try:
conn = cumin.database.get_connection()
cursor = conn.cursor()
# cls = cumin.model.org_apache_qpid_broker.Broker XXX fails
- cls = cumin.model.com_redhat_grid.Scheduler
+ cls = cumin.model.com_redhat_grid.Submission
obj = cls.get_object(cursor)
- print "Calling echo on", obj
+ print "Calling method on", obj
- completed = Event()
+ summs = cumin.model.get_submission_job_summaries(obj)
+
+ pprint(summs)
+
+ #completed = Event()
- def completion(x, y):
- print x, y
- completed.set()
+ #def completion(x, y):
+ # print x, y
+ # completed.set()
- cumin.session.call_method(completion, obj, "echo", (1, "Hello!"))
+ #cumin.session.call_method(completion, obj, "GetJobSummaries", ())
- completed.wait()
+ #completed.wait()
finally:
cumin.stop()
Modified: mgmt/newdata/cumin/python/cumin/model.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/model.py 2010-06-18 17:47:08 UTC (rev 4041)
+++ mgmt/newdata/cumin/python/cumin/model.py 2010-06-21 14:41:11 UTC (rev 4042)
@@ -29,8 +29,10 @@
self.task_invocations = list()
self.limits_by_negotiator = dict()
- self.jobs_by_submission = dict()
+ self.job_summaries_by_submission = dict()
+ self.lock = Lock()
+
def check(self):
log.info("Checking %s", self)
@@ -84,22 +86,26 @@
finally:
self.lock.release()
- def get_submission_jobs(self, submission):
+ def get_submission_job_summaries(self, submission):
assert submission
self.lock.acquire()
try:
try:
- store = self.jobs_by_submission[submission]
+ store = self.job_summaries_by_submission[submission]
except KeyError:
- store = SubmissionJobStore(self, submission)
+ store = SubmissionJobSummaryStore(self, submission)
store.start_updates()
- self.jobs_by_submission[submission] = store
+ self.job_summaries_by_submission[submission] = store
- sleep(1)
+ for i in range(5):
+ if store.data:
+ break
+ sleep(1)
+
return store.data
finally:
self.lock.release()
@@ -1959,14 +1965,20 @@
self.setDaemon(True)
def run(self):
- for i in range(20):
- try:
- self.store.update()
- except Exception, e:
- log.exception(e)
+ conn = self.store.model.app.database.get_connection()
+ cursor = conn.cursor()
- sleep(30)
+ try:
+ for i in range(20):
+ try:
+ self.store.update(cursor)
+ except Exception, e:
+ log.exception(e)
+ sleep(30)
+ finally:
+ conn.close()
+
self.store.delete()
class UpdateTimedOut(Exception):
@@ -1978,7 +1990,7 @@
self.negotiator = negotiator
- def update(self):
+ def update(self, cursor):
def completion(status, data):
self.data = data["Limits"]
@@ -1989,21 +2001,20 @@
super(NegotiatorLimitStore, self).delete()
-class SubmissionJobStore(ObjectStore):
+class SubmissionJobSummaryStore(ObjectStore):
def __init__(self, model, submission):
- super(SubmissionJobStore, self).__init__(model)
+ super(SubmissionJobSummaryStore, self).__init__(model)
self.submission = submission
- def update(self):
+ def update(self, cursor):
def completion(status, data):
self.data = data["Jobs"]
- scheduler = self.submission.scheduler
+ self.model.app.session.call_method \
+ (completion, self.submission, "GetJobSummaries", ())
- scheduler.GetJobs(completion, self.submission.Name, None)
-
def delete(self):
- del self.model.jobs_by_submission[self.submission]
+ del self.model.job_summaries_by_submission[self.submission]
- super(SubmissionJobStore, self).delete()
+ super(SubmissionJobSummaryStore, self).delete()
More information about the rhmessaging-commits
mailing list