rhmessaging commits: r2807 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-15 17:24:26 -0500 (Sat, 15 Nov 2008)
New Revision: 2807
Modified:
mgmt/trunk/cumin/python/cumin/widgets.py
Log:
Made StateSwitch param public so the default can be changed
Modified: mgmt/trunk/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/widgets.py 2008-11-15 00:38:08 UTC (rev 2806)
+++ mgmt/trunk/cumin/python/cumin/widgets.py 2008-11-15 22:24:26 UTC (rev 2807)
@@ -519,8 +519,8 @@
def __init__(self, app, name):
super(StateSwitch, self).__init__(app, name)
- self.__param = Parameter(app, "param")
- self.add_parameter(self.__param)
+ self.param = Parameter(app, "param")
+ self.add_parameter(self.param)
self.__states = list()
self.__titles = dict()
@@ -533,14 +533,14 @@
self.__hover[state] = hover
self.__bookmark[state] = bm
- if self.__param.default is None:
- self.__param.default = state
+ if self.param.default is None:
+ self.param.default = state
def get(self, session):
- return self.__param.get(session)
+ return self.param.get(session)
def set(self, session, value):
- foo = self.__param.set(session, value)
+ foo = self.param.set(session, value)
return foo
def get_items(self, session):
@@ -559,7 +559,7 @@
""" needed because the SubmitSwitch class uses
a hidden input to set the param value instead
of a link name/value pair """
- return self.__param.path
+ return self.param.path
def render_item_link(self, session, state):
branch = session.branch()
16 years, 1 month
rhmessaging commits: r2806 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-14 19:38:08 -0500 (Fri, 14 Nov 2008)
New Revision: 2806
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/pool.py
mgmt/trunk/cumin/python/cumin/pool.strings
mgmt/trunk/cumin/python/cumin/slot.py
mgmt/trunk/cumin/python/cumin/slot.strings
mgmt/trunk/cumin/python/cumin/stat.py
mgmt/trunk/cumin/python/cumin/stat.strings
mgmt/trunk/cumin/python/cumin/system.py
mgmt/trunk/cumin/python/cumin/system.strings
Log:
Removed Job vis on Pool staticsics page.
Added Machine vis.
Added Slot vis.
Show all slots by default if they will fit.
Show slots for 1st machine if all don't fit.
Show slots for machine when that machine is selected.
Made vis elements lighter weight. Using divs instead of buttons.
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-15 00:38:08 UTC (rev 2806)
@@ -1841,8 +1841,45 @@
return Pool(coll)
get = classmethod(get)
+
+
+class Visualization(CuminAction):
+ def __init__(self, cls, name, itemset):
+ super(Visualization, self).__init__(cls, name)
+
+ assert itemset
+ self.itemset = itemset
+
+ def get_xml_response(self, session, object, *args):
+ boxes = self.get_boxes(session, object, *args)
+ fields = self.get_field_tuples(session)
+
+ writer = Writer()
+ writer.write("<%ss>" % self.itemset.name)
+ for box in boxes:
+ writer.write("<%s id='%s'" % (self.itemset.name, str(box["id"])))
+ for field, desc in fields:
+ writer.write(" %s='%s'" % (field, box[field]))
+ if fields:
+ writer.write(" color='%s'/>" % self.get_color(box))
+ writer.write("</%ss>" %self.itemset.name)
+ return writer.to_string()
+
+ def get_boxes(self, session, object, *args):
+ if args:
+ self.itemset.set_args(session, *args)
+ cursor = self.itemset.get_items(session, object)
+ box_list = self.itemset.cursor_to_rows(cursor)
+ return box_list
+
+ def get_color(self, box):
+ return ""
+
+ def get_colors(self):
+ return list()
+
from job import JobSet
-from pool import PoolSlotSet
+from pool import PoolSlotSet, PoolMachineSet
class CuminPool(CuminClass):
def __init__(self, model):
@@ -1858,22 +1895,22 @@
stat = self.PercentStat(self, "Completed")
stat.title = "Completed Jobs"
- stat = self.PercentStat(self, "Idle")
+ stat = self.IdlePercentStat(self, "Idle")
stat.title = "Idle Jobs"
stat = self.PercentStat(self, "Held")
stat.title = "Held Jobs"
- stat = self.PercentStat(self, "Removed")
+ stat = self.RemovedPercentStat(self, "Removed")
stat.title = "Removed Jobs"
- stat = self.PercentStat(self, "Jobs")
+ stat = self.JobsPercentStat(self, "Jobs")
stat.title = "Total Jobs"
- action = self.VisJobs(self, "jobs")
+ action = self.VisSlots(self, "slots")
action.navigable = False
- action = self.VisSlots(self, "slots")
+ action = self.VisMachine(self, "machines")
action.navigable = False
def init(self):
@@ -1893,30 +1930,40 @@
class PercentStat(CuminStat):
def value_text(self, pool):
state = self.name
-
- if state == "Jobs":
- return str(Job.select().count())
value = self.get_value(pool, state)
return str(value)
+ def get_sql_and_elem(self):
+ pass
+
+ def get_sql_or_elem(self):
+ pass
+
+ def get_status_elem(self, state):
+ istate = JobStatusInfo.get_status_int(state)
+ return "job_status = %i" % istate
+
def get_value(self, pool, state):
elems = list()
- istate = JobStatusInfo.get_status_int(state)
- elems.append("job_status = %i" % istate)
- elems.append("s.pool = '%s'" % pool.id)
+ status_elem = self.get_status_elem(state)
+ if status_elem:
+ elems.append(status_elem)
- # manually removed jobs will have a state of Idle
- # with a deletion_time
- if state == "Idle":
- elems.append("job.deletion_time is null")
- where = " and ".join(elems)
-
- # manually removed jobs will have a state of Idle
- # with a deletion_time
- if state == "Removed":
- removed = "(job.deletion_time is not null and job_status = %i)" % JobStatusInfo.get_status_int("Idle")
- where = " or ".join((where, removed))
-
+ and_elem = self.get_sql_and_elem()
+ if and_elem:
+ elems.append(and_elem)
+
+ or_elem = self.get_sql_or_elem()
+
+ return self.do_select(pool, elems, or_elem)
+
+ def do_select(self, pool, and_elems, or_elem):
+ and_elems.append("s.pool = '%s'" % pool.id)
+ where = " and ".join(and_elems)
+
+ if or_elem:
+ where = " or ".join((where, or_elem))
+
jn = "inner join scheduler as s on s.id = scheduler_id"
return Job.select(where, join=jn).count()
@@ -1925,51 +1972,86 @@
return self.get_item_rate(pool, state)
def get_item_rate(self, pool, state):
- jobs = Job.select().count()
+ jobs = self.do_select(pool, list(), None)
- if state == "Jobs":
- value = jobs
- else:
- value = self.get_value(pool, state)
+ value = self.get_value(pool, state)
if jobs:
- percent = (value*1.0) / (jobs*1.0) * 100.0
+ percent = float(value) / float(jobs) * 100.0
return jobs and "%2.2f" % percent or "-"
- class VisSlots(CuminAction):
+
+ class JobsPercentStat(PercentStat):
+ def get_status_elem(self, state):
+ return None
+
+ def rate_text(self, pool):
+ return "100.00"
+
+ class IdlePercentStat(PercentStat):
+ def get_sql_and_elem(self):
+ # manually removed jobs will have a state of Idle
+ # with a deletion_time
+ return "job.deletion_time is null"
+
+ class RemovedPercentStat(PercentStat):
+ def get_sql_or_elem(self):
+ # if a job is idle, but has a deletion_time, it's actually Removed
+ return "(job.deletion_time is not null and job_status = %i)" % \
+ JobStatusInfo.get_status_int("Idle")
+
+ class VisSlots(Visualization):
# list of status/colors in the order we want them displayed
# in the legend
load_colors = [("Idle", "clear"),
("Busy", "green"),
("Suspended", "red"),
- ("Vacating", "red"),
+ ("Vacating", "orange"),
("Killing", "blue"),
("Benchmarking", "yellow")]
def __init__(self, cls, name):
- super(CuminPool.VisSlots, self).__init__(cls, name)
-
- self.slot_set = self.ModelPoolSlotSet(cls.model.app, name)
-
- def get_xml_response(self, session, pool, *args):
- slots = self.get_slots(session, pool)
- writer = Writer()
- writer.write("<slots>")
- for slot in slots:
- writer.write("<slot id='%i' name='%s' machine='%s' job='%s' color='%s'/>" % \
- (slot["id"],
- slot["name"],
- slot["machine"],
- slot["job_id"],
- self.get_color(slot)))
- writer.write("</slots>")
- return writer.to_string()
+ self.slot_set = self.ModelPoolSlotSet(cls.model.app, "slot")
+ super(CuminPool.VisSlots, self).__init__(cls, name, self.slot_set)
- def get_slots(self, session, pool):
- cursor = self.slot_set.do_get_items(session, pool)
- slot_list = self.slot_set.cursor_to_rows(cursor)
- return slot_list
+ def get_field_tuples(self, session):
+ return [("name", "Name"), ("machine", "Machine"), ("job_id", "Job")]
+ def set_machine(self, session, machine):
+ self.slot_set.set_machine(session, machine)
+
+ def get_machine(self, session):
+ return self.slot_set.get_machine(session)
+
class ModelPoolSlotSet(PoolSlotSet):
+ def __init__(self, app, name):
+ super(CuminPool.VisSlots.ModelPoolSlotSet, self).__init__(app, name)
+
+ self.__machine = None
+
+ def set_args(self, session, machine):
+ self.set_machine(session, machine)
+
+ def set_machine(self, session, machine):
+ self.__machine= machine
+
+ def get_machine(self, session):
+ return self.__machine
+
+ def render_sql_where(self, session, pool):
+ elems = list()
+ elems.append("s.pool = %(pool)s")
+ if self.__machine:
+ elems.append("machine = %(machine)s")
+
+ return "where %s" % " and ".join(elems)
+
+ def get_sql_values(self, session, pool):
+ values = {"pool": pool.id}
+ machine = self.__machine
+ if machine:
+ values["machine"] = machine
+ return values
+
def render_sql_limit(self, session, *args):
pass
@@ -1986,54 +2068,31 @@
def get_colors(self):
return self.load_colors
- class VisJobs(CuminAction):
+ class VisMachine(VisSlots):
+ load_colors = [("Idle", "clear"),
+ ("> 0%", "green"),
+ ("> 25%", "yellow"),
+ ("> 50%", "blue"),
+ ("> 75%", "green3")]
+
def __init__(self, cls, name):
- super(CuminPool.VisJobs, self).__init__(cls, name)
-
- self.job_set = self.ModelSystemJobSet(cls.model.app, name)
-
- def get_xml_response(self, session, pool, *args):
- jobs = self.get_jobs(session, pool)
- writer = Writer()
- writer.write("<jobs>")
- for job in jobs:
- status = self.get_status(job)
- writer.write("<job id='%i' name='%s' submitter='%s' status='%s' color='%s'/>" % \
- (job["id"],
- job["custom_id"],
- job["submitter"],
- JobStatusInfo.get_status_string(status),
- self.get_color(job)))
- writer.write("</jobs>")
- return writer.to_string()
+ machine_set = PoolMachineSet(cls.model.app, "machine")
+ super(CuminPool.VisSlots, self).__init__(cls, name, machine_set)
- def get_jobs(self, session, pool):
- cursor = self.job_set.do_get_items(session, pool)
- return self.job_set.cursor_to_rows(cursor)
-
- class ModelSystemJobSet(JobSet):
- def render_sql_limit(self, session, *args):
- pass
-
- def render_sql_orderby(self, session, *args):
- return "order by custom_id asc"
-
- def get_status(self, job):
- status = job["job_status"]
- if status != JobStatusInfo.get_status_int("Completed") and job["deletion_time"]:
- status = JobStatusInfo.get_status_int("Removed")
- return status
+ def get_field_tuples(self, session):
+ return [("machine", "Machine"), ("busy", "Busy Slots"), ("idle", "Idle Slots")]
- def get_color(self, job):
- status = self.get_status(job)
- color = JobStatusInfo.get_status_color(status)
- if "Idle" == JobStatusInfo.get_status_string(status):
- color = "clear" # instead of green
- return color
+ def get_color(self, machine):
+ total = float(machine["total"])
+ busy = float(machine["busy"])
+ work = total > 0.0 and busy / total or 0.0
+
+ percents = [0.0, 0.25, 0.5, 0.75, 1.0]
+ for i in range(len(percents)):
+ if work <= percents[i]:
+ return self.load_colors[i][1]
+ return "black"
- def get_colors(self):
- return JobStatusInfo.get_zipped_colors()
-
class CuminLimit(CuminClass):
def __init__(self, model):
super(CuminLimit, self).__init__ \
@@ -2829,12 +2888,8 @@
xargs = self.__args.get(session)
if cls:
- try:
- object = cls.mint_class.get(id)
- except:
- object = None
+ object = cls.mint_class.get(id)
method = self.__method.get(session)
- #self.app.model.write_xml(writer, objects)
for action in cls.actions:
if action.name == method:
args = xargs and xargs.split("&") or list()
Modified: mgmt/trunk/cumin/python/cumin/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.py 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/pool.py 2008-11-15 00:38:08 UTC (rev 2806)
@@ -17,7 +17,7 @@
from collector import CollectorSet, CollectorFrame, CollectorStart, CollectorStop
from negotiator import NegotiatorSet, NegotiatorFrame, NegStart, NegStop
from limits import LimitsSet, LimitsFrame
-from slot import SlotSet
+from slot import SlotSet, MachineSet
strings = StringCatalog(__file__)
log = logging.getLogger("cumin.pool")
@@ -265,6 +265,19 @@
count = self.get_item_count(session, pool)
return "Slots %s" % fmt_count(count)
+class PoolMachineSet(MachineSet):
+ def get_args(self, session):
+ return self.frame.get_args(session)
+
+ def render_sql_where(self, session, pool):
+ return "where s.pool = %(pool)s"
+
+ def get_sql_values(self, session, pool):
+ return {"pool": pool.id}
+
+ def render_sql_orderby(self, session, pool):
+ return "order by machine asc"
+
class PoolStats(Widget):
def __init__(self, app, name):
super(PoolStats, self).__init__(app, name)
@@ -272,81 +285,146 @@
stats = PoolStatSet(app, "general", "general")
self.add_child(stats)
- #self.grid = self.JobUtilizationGrid(app, "job_grid")
- #self.add_child(self.grid)
-
self.slot_grid = self.SlotVisualization(app, "slot_grid")
self.add_child(self.slot_grid)
+ self.machine_grid = self.MachineVisualization(app, "machine_grid")
+ self.add_child(self.machine_grid)
+
+ self.all_fits = Attribute(app, "can_show_all")
+ self.add_attribute(self.all_fits)
+
+ self.machine_param = Parameter(app, "machine_param")
+ self.add_parameter(self.machine_param)
+
+ self.show_all_slots = self.ShowAllSlots(app, "show_all_slots")
+ self.add_child(self.show_all_slots)
+
+ class ShowAllSlots(Widget):
+ def render(self, session):
+ if self.parent.slot_grid.get_machine(session):
+ if self.parent.all_fits.get(session):
+ return super(PoolStats.ShowAllSlots, self).render(session)
+
+ def render_href(self, session):
+ branch = session.branch()
+ self.parent.machine_param.set(branch, None)
+ return branch.marshal()
+
+ def process(self, session):
+ super(PoolStats, self).process(session)
+ machine = self.machine_param.get(session)
+ all_fits = True
+
+ if not machine:
+ self.slot_grid.set_machine(session, None)
+ all_fits = self.slot_grid.will_it_fit(session)
+ if not all_fits:
+ machine = self.machine_grid.get_1st_machine(session)
+ else:
+ machine = ""
+
+ self.all_fits.set(session, all_fits)
+ self.slot_grid.set_machine(session, machine)
+
def render_title(self, session):
return "Statistics"
- class JobUtilizationGrid(StatUtilizationGrid):
+ def render_slot_title(self, session):
+ machine = self.slot_grid.get_machine(session)
+ return machine and "Slots on (%s)" % machine or "Slot Utilization"
+
+ def render_machine_title(self, session):
+ pool = self.frame.get_args(session)[0]
+ return "Machines on %s" % pool.name
+
+ class SlotVisualization(StatUtilizationGrid):
+ def __init__(self, app, name):
+ super(PoolStats.SlotVisualization, self).__init__(app, name)
+
def get_cells(self, session):
pool = self.frame.get_args(session)[0]
- action = self.app.model.pool.jobs
- return action.get_jobs(session, pool)
+ action = self.app.model.pool.slots
+ return action.get_boxes(session, pool)
def render_title(self, session):
return ""
+ def set_machine(self, session, machine):
+ action = self.app.model.pool.slots
+ action.set_machine(session, machine)
+
+ def get_machine(self, session):
+ action = self.app.model.pool.slots
+ return action.get_machine(session)
+
def get_colors(self, session):
- action = self.app.model.pool.jobs
+ action = self.app.model.pool.slots
return action.get_colors()
def get_color(self, session, job):
- action = self.app.model.pool.jobs
+ action = self.app.model.pool.slots
return action.get_color(job)
- def get_contents(self, session, job):
+ def get_contents(self, session, slot):
return ""
- def get_href(self, session, job):
+ def get_href(self, session, slot):
branch = session.branch()
- ojob = Job.get(job["id"])
- return self.page.main.pool.job.get_href(branch, ojob)
+ try:
+ job = Job.select("custom_id = '%s'" % slot.JobId)[0]
+ except Exception, e:
+ return "#"
+ return self.page.main.pool.job.get_href(branch, job)
def get_url(self, session):
pool = self.parent.frame.get_args(session)[0]
- return "call.xml?class=pool;id=%s;method=jobs" % pool.id
+ machine = self.get_machine(session)
+ return "call.xml?class=pool;id=%s;method=slots;xargs=%s" % (pool.id, machine)
def get_sticky_info(self, session):
- return [("name", "ID"), ("submitter", "Submitter"), ("status", "Status")]
+ action = self.app.model.pool.slots
+ return action.get_field_tuples(session)
- class SlotVisualization(StatUtilizationGrid):
+ class MachineVisualization(StatUtilizationGrid):
def get_cells(self, session):
pool = self.frame.get_args(session)[0]
- action = self.app.model.pool.slots
- return action.get_slots(session, pool)
+ action = self.app.model.pool.machines
+ return action.get_boxes(session, pool)
def render_title(self, session):
return ""
+ def get_1st_machine(self, session):
+ pool = self.frame.get_args(session)[0]
+ action = self.app.model.pool.machines
+ boxes = action.get_boxes(session, pool)
+ return boxes[0]["machine"]
+
def get_colors(self, session):
- action = self.app.model.pool.slots
+ action = self.app.model.pool.machines
return action.get_colors()
- def get_color(self, session, job):
- action = self.app.model.pool.slots
- return action.get_color(job)
+ def get_color(self, session, machine):
+ action = self.app.model.pool.machines
+ return action.get_color(machine)
- def get_contents(self, session, slot):
+ def get_contents(self, session, machine):
return ""
- def get_href(self, session, slot):
+ def get_href(self, session, data):
+ machine = data["machine"]
branch = session.branch()
- try:
- job = Job.select("custom_id = '%s'" % slot.JobId)[0]
- except Exception, e:
- return "#"
- return self.page.main.pool.job.get_href(branch, job)
+ self.parent.machine_param.set(branch, machine)
+ return branch.marshal()
def get_url(self, session):
pool = self.parent.frame.get_args(session)[0]
- return "call.xml?class=pool;id=%s;method=slots" % pool.id
+ return "call.xml?class=pool;id=%s;method=machines" % pool.id
def get_sticky_info(self, session):
- return [("name", "Name"), ("machine", "Machine"), ("job", "Job")]
+ action = self.app.model.pool.machines
+ return action.get_field_tuples(session)
class PoolStatSet(StatSet):
def render_rate_text(self, session, args):
Modified: mgmt/trunk/cumin/python/cumin/pool.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.strings 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/pool.strings 2008-11-15 00:38:08 UTC (rev 2806)
@@ -53,35 +53,47 @@
</div>
</div>
<div style="float: left; margin-left: 4em;">
- <div class="vistats">
- <h2>Slot Utilization</h2>
- {slot_grid}
- </div>
+ <div class="vistats">
+ <h2>{machine_title}</h2>
+ {machine_grid}
+ </div>
+ <div class="vistats">
+ <h2>{slot_title}</h2>
+ {show_all_slots}
+ {slot_grid}
+ </div>
</div>
<div style="clear:left;"><!-- --></div>
+[ShowAllSlots.html]
+ <ul class="actions" style="margin: 1em;">
+ <li><a class="nav" href="{href}">Show All Slots</a></li>
+ </ul>
-[JobUtilizationGrid.javascript]
-function got_job_grid(obj, id) {
- for (var cell in obj.jobs.job) {
- var job = obj.jobs.job[cell]
- var ojob_Button = document.getElementById("button_"+cell);
- if (ojob_Button) {
- ojob_Button.className = "btn "+job.color;
+
+[MachineVisualization.javascript]
+function got_machine_grid(obj, id) {
+ for (var cell in obj.machines.machine) {
+ var machine = obj.machines.machine[cell]
+ var o_Button = document.getElementById("button_"+cell);
+ if (o_Button) {
+ o_Button.className = "btn "+machine.color;
+ o_Button.onmouseover = over_cell;
+ o_Button.onmouseout = out_cell
}
- var ojob_Name = document.getElementById("cell_name_"+cell);
- if (ojob_Name) {
- ojob_Name.innerHTML = job.name;
+ var o_Machine = document.getElementById("cell_machine_"+cell);
+ if (o_Machine) {
+ o_Machine.innerHTML = machine.machine;
}
- var ojob_Machine = document.getElementById("cell_submitter_"+cell);
- if (ojob_Machine) {
- ojob_Machine.innerHTML = job.submitter;
+ var o_Busy = document.getElementById("cell_busy_"+cell);
+ if (o_Busy) {
+ o_Busy.innerHTML = machine.busy;
}
- var ojob_Job = document.getElementById("cell_status_"+cell);
- if (ojob_Job) {
- ojob_Job.innerHTML = job.status;
+ var o_Idle = document.getElementById("cell_idle_"+cell);
+ if (o_Idle) {
+ o_Idle.innerHTML = machine.idle;
}
}
- setTimeout("get_job_grid()", 1000);
+ setTimeout("get_machine_grid()", 1000);
}
Modified: mgmt/trunk/cumin/python/cumin/slot.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/slot.py 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/slot.py 2008-11-15 00:38:08 UTC (rev 2806)
@@ -22,3 +22,6 @@
def render_title(self, session, data):
return "Name"
+class MachineSet(CuminTable):
+ pass
+
Modified: mgmt/trunk/cumin/python/cumin/slot.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/slot.strings 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/slot.strings 2008-11-15 00:38:08 UTC (rev 2806)
@@ -5,7 +5,7 @@
s.machine,
s.system,
s.job_id,
- c.activity as activity
+ c.activity
from slot as s
left outer join slot_stats as c on c.id = s.stats_curr_id
left outer join slot_stats as p on p.id = s.stats_prev_id
@@ -17,3 +17,21 @@
select count(*)
from slot as s
{sql_where}
+
+[MachineSet.sql]
+select
+ machine as id,
+ machine,
+ sum(case c.activity when 'Busy' then 1 else 0 end) as busy,
+ sum(case c.activity when 'Idle' then 1 else 0 end) as idle,
+ sum(1) as total
+from slot as s
+left outer join slot_stats as c on c.id = s.stats_curr_id
+{sql_where}
+group by machine
+{sql_orderby}
+
+[MachineSet.count_sql]
+select count(*)
+from
+ (select 1 from slot group by machine) as l
Modified: mgmt/trunk/cumin/python/cumin/stat.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/stat.py 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/stat.py 2008-11-15 00:38:08 UTC (rev 2806)
@@ -144,17 +144,44 @@
def __init__(self, app, name):
super(StatUtilizationGrid, self).__init__(app, name)
- cells = self.GridCells(app, "grid")
- self.add_child(cells)
+ self.cells = self.GridCells(app, "grid")
+ self.add_child(self.cells)
legend = self.Legend(app, "grid_legend")
self.add_child(legend)
ajax = self.Updater(app, "grid_updater")
self.add_child(ajax)
+
+ self.ems = 480.0
+ self.max_item_em = 28.0
+ self.max_columns = 20.0
def render_title(self, session):
return "Utilization"
+
+ def render_name(self, session):
+ return self.name
+
+ def will_it_fit(self, session):
+ count = len(self.cells.get_items(session))
+ columns = self.cells.calculate_columns(count)
+ return columns <= self.max_columns
+
+ def get_grid_width(self, session):
+ count = len(self.cells.get_items(session))
+ columns = self.cells.calculate_columns(count)
+ # we have approx 15em to work with
+ # we'd like each column to be between 1em and 2em
+ width = float(columns) * self.max_item_em
+ if width > self.ems:
+ width = self.ems
+ extra = 3.0 * columns # left right border plus 1 right margin
+ return width + extra
+
+ def render_grid_width(self, session):
+ width = self.get_grid_width(session)
+ return "%fpx" % width
class GridCells(ItemSet):
def __init__(self, app, name):
@@ -167,7 +194,8 @@
self.add_child(self.sticky)
def do_get_items(self, session):
- cells = self.parent.get_cells(session)
+ cells = self.parent.get_cells(session)
+ self.items.set(session, cells)
self.width.set(session,
self.calculate_cell_width(len(cells)))
return cells
@@ -178,16 +206,20 @@
def render_cell_width(self, session, cell):
return self.width.get(session)
- def calculate_cell_width(self, count):
+ def calculate_columns(self, count):
sq = sqrt(count)
isq = int(sq)
if sq > isq:
isq = isq + 1
-
- if count > 0:
- return int(100 / isq)
- else:
- return 100
+ return isq
+
+ def calculate_cell_width(self, count):
+ columns = self.calculate_columns(count)
+ width = self.parent.max_item_em
+ if columns:
+ min = self.parent.ems / float(columns)
+ width = min > self.parent.max_item_em and self.parent.max_item_em or min
+ return "%fpx" % width
def render_href(self, session, cell):
return self.parent.get_href(session, cell)
Modified: mgmt/trunk/cumin/python/cumin/stat.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/stat.strings 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/stat.strings 2008-11-15 00:38:08 UTC (rev 2806)
@@ -166,31 +166,7 @@
</li>
[StatUtilizationGrid.javascript]
-/* called on page resize to make sure
- the grid popup notes don't go
- off the edge of the page */
-function ensure_notes_visible() {
- // see if the current rule will show the note off the page
-
- this.find_rule = function() {
- if (document.styleSheets) {
- for (var i=0; i<document.styleSheets.length; i++) {
- var sheet = document.styleSheets[i];
- if (sheet.href.indexOf("cumin.css") != -1) {
- if (sheet.cssRules) {
- var rules = sheet.cssRules;
- for (var j=0; j<rules.length; j++) {
- if (rules[j].cssText.indexOf("div.sticky_note") != -1) {
- return rules[j];
- }
- }
- }
- }
- }
- }
- return null;
- }
-
+function over_cell() {
this.get_left = function (o) {
var p = o.offsetParent;
var true_left = o.offsetLeft;
@@ -208,62 +184,60 @@
return true_left;
}
- var oGrid = document.getElementById("StatGrid");
- if (oGrid) {
- var left = this.get_left(oGrid);
- var right = left + oGrid.offsetWidth;
- var wwidth = 9999;
- if (window.innerWidth)
- wwidth = window.innerWidth;
- if (right + 310 > wwidth) {
- var rule = this.find_rule();
- if (rule) {
- rule.style.left = "";
- rule.style.right = "50%";
+ var id = this.id;
+ var onote = document.getElementById("note_"+id);
+ if (onote) {
+ if (document.all) {
+ this.title = onote.innerText;
+ } else {
+ onote.style.visibility = "hidden";
+ onote.style.display = "block";
+ if (onote.style.left.indexOf("px") == -1) {
+ var left = this.get_left(onote);
+ var right = left + onote.offsetWidth;
+ if (window.innerWidth) {
+ wwidth = window.innerWidth;
+ if (right > wwidth) {
+ oparent = onote.offsetParent;
+ pleft = this.get_left(oparent);
+ onote.style.left = "-" + (onote.offsetWidth - (wwidth - pleft) + 4) + "px";
+ } else {
+ onote.style.left = "50%";
+ }
+ }
}
+ onote.style.visibility = "visible";
}
}
-
-
}
-
-function over_cell(me) {
- var id = me.id;
+function out_cell() {
+ var id = this.id;
var onote = document.getElementById("note_"+id);
- onote.style.display = "block";
- if (document.all) {
- me.title = onote.innerText;
+ if (onote) {
+ onote.style.display = "none";
}
-
}
-function out_cell(me) {
- var id = me.id;
- document.getElementById("note_"+id).style.display = "none";
-}
-addEvent(window, "resize", ensure_notes_visible);
-addEvent(window, "load", ensure_notes_visible);
[StatUtilizationGrid.css]
-div.StatGrid{
- width: 10em;
- height: 10em;
-}
-div.grid_cell {
+div.StatGrid a {
float:left;
+ border:1px solid #EAEAEA;
+ margin: 0 1px 1px 0;
position: relative;
+ color: black;
}
-div.grid_cell button {
- width:95%;
- height:95%;
-}
+div.StatGrid a:hover { border: 1px solid #a00; }
.btn.yellow { background: #ffc; }
.btn.green { background: #cfc; }
.btn.blue { background: #ccf; }
.btn.red { background: #fcc; }
+.btn.orange { background: #ffbb5e; }
.btn.black { background: #444; }
.btn.clear { background: white; }
-.btn:hover { border: 1px solid #a00; }
-.btn:active { background-color: #444; }
+.btn.green1 { background: #9f9; }
+.btn.green2 { background: #6c6; }
+.btn.green3 { background: #393; }
+
.btn[class] { background-image: url(resource?name=shade1.png); background-position: bottom; }
div.visualization {
@@ -275,7 +249,7 @@
margin-left: 1em;
}
-div.grid_cell div.sticky_note {
+div.sticky_note {
display: none;
font-size: 0.8em;
position: absolute;
@@ -300,12 +274,13 @@
td.sticky_values {
line-height: 1em;
}
-div#cell_legend {
- margin: 1em;
+div.cell_legend {
+ margin: 1em 0;
font-size: 0.8em;
color: #000;
+ position: relative;
}
-div#cell_legend button {
+div.cell_legend button {
position: relative;
top: -2px;
}
@@ -314,33 +289,32 @@
<div class="StatUtilizationGrid" id="{id}">
<div class="visualization">
<h2>{title}</h2>
- <div id="StatGrid" class="StatGrid">
+ <div id="StatGrid" class="StatGrid" style="width:{grid_width};">
{grid}
+ <div style="clear:left;"><!-- --></div>
</div>
- <div style="clear:left;"><!-- --></div>
</div>
+ <div id="legend_{name}" class="cell_legend">
+ {grid_legend}
+ </div>
</div>
-<div id="cell_legend">
- {grid_legend}
-</div>
{grid_updater}
[GridCells.html]
{items}
[GridCells.item_html]
- <div id="{cell_id}" class="grid_cell"
- style="width:{cell_width}%; height:{cell_width}%;"
- onmouseover="over_cell(this)"
- onmouseout="out_cell(this)" >
- <a href="{href}"><button id="button_{cell_id}" class="btn {color}" >{contents}</button></a>
- <div id="note_{cell_id}" class="sticky_note">
+ <a href="{href}">
+ <div id="button_{cell_id}" class="btn {color}" style="width:{cell_width}; height:{cell_width};">
+ {contents}
+ </div>
+ <div id="note_button_{cell_id}" class="sticky_note">
<table class="sticky_table">
{sticky_rows}
</table>
</div>
- </div>
-
+ </a>
+
[Sticky.item_html]
<tr>
<td class="sticky_names" nowrap="nowrap">{sticky_title}: </td>
Modified: mgmt/trunk/cumin/python/cumin/system.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/system.py 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/system.py 2008-11-15 00:38:08 UTC (rev 2806)
@@ -91,7 +91,8 @@
return "call.xml?class=system;id=%i;method=slots" % system.id
def get_sticky_info(self, session):
- return [("name", "Name"), ("machine", "Machine"), ("job", "Job")]
+ action = self.app.model.system.slots
+ return action.get_field_tuples()
class SystemView(CuminView):
def __init__(self, app, name):
@@ -119,7 +120,7 @@
def render_sql_where(self, session, system):
elems = list()
- #elems.append("s.system = %(nodeName)s")
+ elems.append("s.system = %(nodeName)s")
elems.append(self.get_phase_sql(session))
return "where %s" % " and ".join(elems)
@@ -137,7 +138,7 @@
def render_sql_where(self, session, system):
elems = list()
- #elems.append("machine = %(nodeName)s")
+ elems.append("machine = %(nodeName)s")
elems.append("deletion_time is null")
return "where %s" % " and ".join(elems)
Modified: mgmt/trunk/cumin/python/cumin/system.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/system.strings 2008-11-14 20:12:00 UTC (rev 2805)
+++ mgmt/trunk/cumin/python/cumin/system.strings 2008-11-15 00:38:08 UTC (rev 2806)
@@ -31,6 +31,8 @@
var oslot_Button = document.getElementById("button_"+cell);
if (oslot_Button) {
oslot_Button.className = "btn "+slot.color;
+ oslot_Button.onmouseover = over_cell;
+ oslot_Button.onmouseout = out_cell
}
var oslot_Name = document.getElementById("cell_name_"+cell);
if (oslot_Name) {
16 years, 1 month
rhmessaging commits: r2805 - in mgmt/trunk: cumin/python/wooly and 3 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-14 15:12:00 -0500 (Fri, 14 Nov 2008)
New Revision: 2805
Modified:
mgmt/trunk/cumin/python/cumin/__init__.py
mgmt/trunk/cumin/python/cumin/broker.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/page.py
mgmt/trunk/cumin/python/cumin/page.strings
mgmt/trunk/cumin/python/cumin/tools.py
mgmt/trunk/cumin/python/wooly/forms.py
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/schema.py
mgmt/trunk/mint/python/mint/schemaparser.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/sql/schema.sql
mgmt/trunk/parsley/python/parsley/command.py
Log:
Nuno's adaptation of mint to the qmfconsole api, which replaces the
older "management.py" api.
Modified: mgmt/trunk/cumin/python/cumin/__init__.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/__init__.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -18,6 +18,7 @@
from action import ActionPage
from user import LoginPage, UserSession
from datetime import timedelta
+from qpid.util import URL
from wooly import Session
@@ -121,8 +122,7 @@
def do_run(self):
while True:
for reg in BrokerRegistration.select():
- if reg.broker is None or reg.broker.managedBroker not in \
- self.model.data.connections:
+ if reg.broker is None or reg.getBrokerId() not in self.model.data.managedBrokers:
attempts = self.attempts.get(reg, 0)
attempts += 1
self.attempts[reg] = attempts
@@ -133,7 +133,6 @@
reg.connect(self.model.data)
elif attempts % 100 == 0:
reg.connect(self.model.data)
-
self.event.wait(10)
class CuminServer(WebServer):
Modified: mgmt/trunk/cumin/python/cumin/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/broker.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/broker.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -658,14 +658,9 @@
if addr:
name = names[i]
- host, port = parse_broker_addr(addr)
+ url = "amqp://%s:%i" % parse_broker_addr(addr)
- args = {
- "name": name,
- "host": host,
- "port": port
- }
-
+ args = {"name": name, "url": url}
reg = action.invoke(None, args)
if len(groups) > i:
@@ -701,13 +696,13 @@
errs = aerrs.setdefault(i, list())
errs.append("The address field is empty; it is required")
else:
- host, port = parse_broker_addr(addr)
+ #host, port = parse_broker_addr(addr)
count = BrokerRegistration.selectBy \
- (host=host, port=port).count()
+ (url=addr).count()
if count:
errs = aerrs.setdefault(i, list())
- errs.append("The broker at %s:%i " % (host, port) +
+ errs.append("The broker at %s " % (url) +
"is already registered")
return not len(nerrs) and not len(aerrs)
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1668,14 +1668,10 @@
super(CuminBrokerRegistration, self).__init__ \
(model, "broker_registration", BrokerRegistration)
- prop = CuminProperty(self, "host")
- prop.title = "Host"
+ prop = CuminProperty(self, "url")
+ prop.title = "URL"
prop.summary = True
- prop = CuminProperty(self, "port")
- prop.title = "Port"
- prop.summary = True
-
action = self.Add(self, "add")
action.title = "Add"
action.navigable = False
Modified: mgmt/trunk/cumin/python/cumin/page.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -209,6 +209,9 @@
heading = self.Heading(app, "heading")
self.add_child(heading)
+ self.add_tab(self.OverviewTab(app, "over"))
+ self.add_tab(self.AccountTab(app, "acct"))
+
def render_change_password_href(self, session):
branch = session.branch()
self.frame.change_password.show(branch)
@@ -218,6 +221,22 @@
def render_title(self, session):
return "MRG Management"
+ class OverviewTab(Widget):
+ def render_title(self, session):
+ return "Overview"
+
+ def render_content(self, session):
+ pass
+
+ class AccountTab(ActionSet):
+ def render_title(self, session):
+ return "Your Account"
+
+ def render_change_password_href(self, session):
+ branch = session.branch()
+ self.frame.change_password.show(branch)
+ return branch.marshal()
+
class MessagingView(TabbedModeSet):
def __init__(self, app, name):
super(MessagingView, self).__init__(app, name)
Modified: mgmt/trunk/cumin/python/cumin/page.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/page.strings 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/page.strings 2008-11-14 20:12:00 UTC (rev 2805)
@@ -180,11 +180,15 @@
<div class="oblock">
{heading}
- <ul class="actions">
- <a class="nav" href="{change_password_href}">Change Password</a>
- </ul>
+ <ul class="TabbedModeSet tabs">{tabs}</ul>
+ <div class="TabbedModeSet mode">{mode}</div>
</div>
+[AccountTab.html]
+<ul class="actions">
+ <a class="nav" href="{change_password_href}">Change Password</a>
+</ul>
+
[MessagingView.html]
<script type="text/javascript">
<![CDATA[
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -31,9 +31,6 @@
self.config = CuminConfig()
- opt = CommandOption(self, "help", "h")
- opt.description = "Print this message"
-
opt = CommandOption(self, "data")
opt.argument = "URI"
opt.description = "Connect to database at URI"
@@ -84,6 +81,20 @@
opt = CommandOption(command, "force")
opt.description = "Don't complain and just do it"
+ command = self.AddBroker(self, "add-broker")
+ command.arguments = ("NAME", "URL")
+ command.description = "Add a new broker called NAME at URL"
+
+ command = self.RemoveBroker(self, "remove-broker")
+ command.arguments = ("URL",)
+ command.description = "Remove broker called NAME; requires --force"
+
+ opt = CommandOption(command, "force")
+ opt.description = "Don't complain and just do it"
+
+ command = self.ListBrokers(self, "list-brokers")
+ command.description = "List existing broker registrations"
+
command = self.AddUser(self, "add-user")
command.arguments = ("NAME",)
command.description = "Add a new user called NAME"
@@ -179,6 +190,46 @@
def run(self, opts, args):
self.parent.database.checkSchema()
+ class AddBroker(Command):
+ def run(self, opts, args):
+ try:
+ name, url = args[1:]
+ except IndexError:
+ raise CommandException(self, "NAME and URL are required")
+
+ for reg in BrokerRegistration.selectBy(name=name):
+ print "Error: a broker called '%s' already exists" % name
+ sys.exit(1)
+
+ for reg in BrokerRegistration.selectBy(url=url):
+ print "Error: a broker at %s already exists" % url
+ sys.exit(1)
+
+ reg = BrokerRegistration(name=name, url=url)
+ reg.syncUpdate()
+
+ class RemoveBroker(Command):
+ def run(self, opts, args):
+ try:
+ name = args[1]
+ except IndexError:
+ raise CommandException(self, "NAME is required")
+
+ for reg in BrokerRegistration.selectBy(name=name):
+ break
+
+ if reg:
+ reg.destroySelf()
+ reg.syncUpdate()
+ else:
+ raise CommandException \
+ (self, "Broker '%s' is unknown", reg.name)
+
+ class ListBrokers(Command):
+ def run(self, opts, args):
+ for reg in BrokerRegistration.select():
+ print reg.name, reg.url
+
class AddUser(Command):
def run(self, opts, args):
try:
Modified: mgmt/trunk/cumin/python/wooly/forms.py
===================================================================
--- mgmt/trunk/cumin/python/wooly/forms.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/cumin/python/wooly/forms.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -184,6 +184,9 @@
class PasswordInput(StringInput):
pass
+# XXX Why does this have a boolean param? Shouldn't the name suggest
+# that somehow? Would some folks want a hidden input with a different
+# param? I think this needs to take a param
class HiddenInput(ScalarInput):
def __init__(self, app, name):
super(HiddenInput, self).__init__(app, name, None)
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,14 +1,12 @@
-import sys, os, socket, qpid, logging
-from qpid.datatypes import uuid4
-from qpid.connection import Connection as QpidConnection
-from qpid.util import connect
-from qpid.management import managementChannel, managementClient
-from datetime import *
-from sqlobject import *
+import logging
+import qpid.qmfconsole
+import struct
from threading import Lock, RLock
-from traceback import print_exc, print_stack
-
-from mint import schema
+from sqlobject import *
+from traceback import print_exc
+from qpid.util import URL
+from qpid.datatypes import UUID
+from mint.schema import *
from mint import update
log = logging.getLogger("mint")
@@ -28,95 +26,118 @@
cascade="null", default=None,
name="registration"))
-class MintInfo(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+class MintDatabase(object):
+ def __init__(self, uri):
+ self.uri = uri
- version = StringCol(length=1000, default="0.1", notNone=True)
+ def getConnection(self):
+ return connectionForURI(self.uri).getConnection()
-class BrokerRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ def check(self):
+ self.checkConnection()
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- host = StringCol(length=1000, default=None, notNone=True)
- port = IntCol(default=None, notNone=True)
- broker = ForeignKey("Broker", cascade="null", default=None)
- groups = SQLRelatedJoin("BrokerGroup",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
- cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
- profile = ForeignKey("BrokerProfile", cascade="null", default=None)
+ def init(self):
+ sqlhub.processConnection = connectionForURI(self.uri)
- host_port_unique = index.DatabaseIndex(host, port, unique=True)
+ def checkConnection(self):
+ conn = self.getConnection()
- def connect(self, model):
- log.info("Connecting to broker '%s' at %s:%i" % \
- (self.name, self.host, self.port or 5672))
+ try:
+ cursor = conn.cursor()
+ cursor.execute("select now()")
+ finally:
+ conn.close()
- conn = BrokerConnection(model, self.host, self.port or 5672)
+ def checkSchema(self):
+ pass
+ def dropSchema(self):
+ conn = self.getConnection()
try:
- conn.open()
- log.info("Connection succeeded")
- except:
- log.info("Connection failed: " + str(conn.exception))
+ cursor = conn.cursor()
+
+ cursor.execute("drop schema public cascade")
- def getDefaultVhost(self):
- if self.broker:
+ conn.commit()
+ finally:
+ conn.close()
+
+ def createSchema(self, file_paths):
+ conn = self.getConnection()
+
+ scripts = list()
+
+ for path in file_paths:
+ file = open(path, "r")
try:
- return Vhost.selectBy(broker=self.broker, name="/")[0]
- except IndexError:
+ scripts.append((path, file.read()))
+ finally:
+ file.close()
+
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("create schema public");
+ except:
+ conn.rollback()
pass
-class BrokerGroup(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ for path, text in scripts:
+ stmts = text.split(";")
+ count = 0
- name = StringCol(length=1000, default=None, unique=True, notNone=True)
- brokers = SQLRelatedJoin("BrokerRegistration",
- intermediateTable="broker_group_mapping",
- createRelatedTable=False)
+ for stmt in stmts:
+ stmt = stmt.strip()
-class BrokerGroupMapping(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ if stmt:
+ try:
+ cursor.execute(stmt)
+ except Exception, e:
+ print "Failed executing statement:"
+ print stmt
- brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
- cascade=True)
- brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
- unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
+ raise e
-class BrokerCluster(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ count += 1
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
+ print "Executed %i statements from file '%s'" % (count, path)
-class BrokerProfile(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ conn.commit()
- name = StringCol(length=1000, default=None)
- brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
- properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
+ info = MintInfo(version="0.1")
+ info.sync()
-class ConfigProperty(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ # Standard roles
- name = StringCol(length=1000, default=None)
- value = StringCol(length=1000, default=None)
- type = StringCol(length=1, default="s")
+ user = Role(name="user")
+ user.sync()
-class CollectorRegistration(SQLObject):
- class sqlmeta:
- lazyUpdate = True
+ admin = Role(name="admin")
+ admin.sync()
+ finally:
+ conn.close()
- name = StringCol(length=1000, default=None)
- collectorId = StringCol(length=1000, default=None)
+ def checkSchema(self):
+ conn = self.getConnection()
+ try:
+ cursor = conn.cursor()
+
+ try:
+ cursor.execute("select version from mint_info");
+ except Exception, e:
+ print "No schema present"
+ return
+
+ for rec in cursor:
+ print "OK (version %s)" % rec[0]
+ return;
+
+ print "No schema present"
+ finally:
+ conn.close()
+
class Subject(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -165,139 +186,101 @@
class ObjectNotFound(Exception):
pass
-# Not thread safe
-class BrokerConnection(object):
- def __init__(self, model, host, port):
- self.model = model
- self.host = host
- self.port = port
- self.id = "%s:%i" % (host, port)
- self.objectsById = dict()
+class MintInfo(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- # Set upon receiving a broker info control
- self.sessionId = None
- self.brokerId = None
+ version = StringCol(length=1000, default="0.1", notNone=True)
- # state in (None, "opening", "opened", "closing", "closed")
- self.state = None
- self.exception = None
+class CollectorRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.mconn = None
- self.mclient = None
- self.mchan = None
+ name = StringCol(length=1000, default=None)
+ collectorId = StringCol(length=1000, default=None)
- def getObject(self, cls, id):
- compositeId = "%s:%s" % (id.first, id.second)
+class BrokerRegistration(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- try:
- obj = self.objectsById[compositeId]
- except KeyError:
- try:
- obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second, managedBroker=self.id)[0]
- self.objectsById[compositeId] = obj
- except IndexError:
- raise ObjectNotFound()
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ url = StringCol(length=1000, default=None)
+ qmfBroker = None
+ broker = ForeignKey("Broker", cascade="null", default=None)
+ groups = SQLRelatedJoin("BrokerGroup",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
+ cluster = ForeignKey("BrokerCluster", cascade="null", default=None)
+ profile = ForeignKey("BrokerProfile", cascade="null", default=None)
- return obj
+ url_unique = index.DatabaseIndex(url, unique=True)
- def isOpen(self):
- return self.state == "opened"
-
- def open(self):
- assert self.mconn is None
- assert self.mclient is None
- assert self.mchan is None
-
- self.state = "opening"
-
+ def connect(self, model):
+ log.info("Connecting to broker '%s' at %s" % (self.name, self.url))
try:
- spec = qpid.spec.load(self.model.specPath)
- sock = connect(self.host, self.port)
+ self.qmfBroker = model.getSession().addBroker(self.url)
+ log.info("Connection succeeded")
except Exception, e:
- self.exception = e
- raise e
+ log.info("Connection failed: %s ", e.message)
+ print_exc()
- self.mconn = QpidConnection(sock, spec)
- self.mclient = managementClient(spec,
- self.model.controlCallback,
- self.model.propsCallback,
- self.model.statsCallback,
- self.model.methodCallback,
- self.model.closeCallback)
- self.mclient.schemaListener(self.model.schemaCallback)
+ def getBrokerId(self):
+ if self.qmfBroker is not None:
+ return str(self.qmfBroker.getBrokerId())
+ else:
+ return None
- try:
- self.mconn.start()
- self.mchan = self.mclient.addChannel \
- (self.mconn.session(str(uuid4())), self)
+ def getDefaultVhost(self):
+ if self.broker:
+ try:
+ return Vhost.selectBy(broker=self.broker, name="/")[0]
+ except IndexError:
+ return None
- self.state = "opened"
- except Exception, e:
- self.exception = e
- raise e
+class BrokerGroup(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.model.lock()
- try:
- self.model.connections[self.id] = self
- finally:
- self.model.unlock()
+ name = StringCol(length=1000, default=None, unique=True, notNone=True)
+ brokers = SQLRelatedJoin("BrokerRegistration",
+ intermediateTable="broker_group_mapping",
+ createRelatedTable=False)
- def getSessionId(self):
- if not self.isOpen():
- raise Exception("Connection not open")
+class BrokerGroupMapping(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- return self.mchan.sessionId
+ brokerRegistration = ForeignKey("BrokerRegistration", notNull=True,
+ cascade=True)
+ brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
+ unique = index.DatabaseIndex(brokerRegistration, brokerGroup, unique=True)
- def callMethod(self, objId, className, methodName, callback, args):
- if not self.isOpen():
- raise Exception("Connection not open")
+class BrokerCluster(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- self.model.lock()
- try:
- self.model.currentMethodId += 1
- seq = self.model.currentMethodId
- self.model.outstandingMethodCalls[seq] = callback
- finally:
- self.model.unlock()
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="cluster_id")
- self.mclient.callMethod(self.mchan, seq, objId,
- className, methodName, args)
+class BrokerProfile(SQLObject):
+ class sqlmeta:
+ lazyUpdate = True
- def close(self):
- self.state = "closing"
+ name = StringCol(length=1000, default=None)
+ brokers = SQLMultipleJoin("BrokerRegistration", joinColumn="profile_id")
+ properties = SQLMultipleJoin("ConfigProperty", joinColumn="profile_id")
- self.model.lock()
- try:
- del self.model.connections[self.id]
- finally:
- self.model.unlock()
-
- try:
- self.mclient.removeChannel(self.mchan)
-
- self.state = "closed"
- except Exception, e:
- self.exception = e
- raise e
-
- self.mconn.close()
- # XXX What else do I need to try to shutdown here?
-
- self.mconn = None
- self.mclient = None
- self.mchan = None
-
-class MintModel:
+
+class MintModel(qpid.qmfconsole.Console):
staticInstance = None
- def __init__(self, dataUri, specPath, debug=False):
+ def __init__(self, dataUri, specPath="", debug=False):
self.dataUri = dataUri
- self.specPath = specPath
self.debug = debug
- self.currentMethodId = 1
- self.outstandingMethodCalls = dict()
- self.connections = dict()
+ assert MintModel.staticInstance is None
+ MintModel.staticInstance = self
+
self.connCloseListener = None
self.__lock = RLock()
@@ -306,15 +289,14 @@
self.orphanObjectMap = dict()
self.updateThread = update.ModelUpdateThread(self)
+ self.mgmtSession = qpid.qmfconsole.Session(self)
+ self.outstandingMethodCalls = dict()
+ self.managedBrokers = dict()
- assert MintModel.staticInstance is None
- MintModel.staticInstance = self
-
if self.debug:
log.setLevel(logging.DEBUG)
def lock(self):
- #print_stack()
self.__lock.acquire()
def unlock(self):
@@ -326,12 +308,10 @@
except Exception, e:
if hasattr(e, "message") and e.message.find("does not exist"):
print "Database not found; run cumin-database-init"
-
raise e
def init(self):
- conn = connectionForURI(self.dataUri)
- sqlhub.processConnection = conn
+ sqlhub.processConnection = connectionForURI(self.dataUri)
def start(self):
self.updateThread.start()
@@ -342,156 +322,99 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def schemaCallback(self, conn, classInfo, props, stats, methods, events):
- up = update.SchemaUpdate(conn, classInfo, props, stats, methods, events)
- self.updateThread.enqueue(up)
+ def getObject(self, cls, id, broker):
+ obj = None
+ if isinstance(id, qpid.qmfconsole.ObjectId):
+ compositeId = "%s:%s" % (id.first, id.second)
+ try:
+ obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
+ except IndexError:
+ raise ObjectNotFound()
+ elif cls.__name__.endswith("Pool"):
+ try:
+ obj = cls.selectBy(sourceId=id)[0]
+ except IndexError:
+ raise ObjectNotFound()
+
+ return obj
- def propsCallback(self, conn, classInfo, props, timestamps):
- up = update.PropertyUpdate(conn, classInfo, props, timestamps)
- self.updateThread.enqueue(up)
+ def getSession(self):
+ return self.mgmtSession
- def statsCallback(self, conn, classInfo, stats, timestamps):
- up = update.StatisticUpdate(conn, classInfo, stats, timestamps)
- self.updateThread.enqueue(up)
-
- def methodCallback(self, conn, methodId, errorId, errorText, args):
- up = update.MethodUpdate(conn, methodId, errorId, errorText, args)
- self.updateThread.enqueue(up)
-
- def closeCallback(self, conn, data):
- up = update.CloseUpdate(conn, data)
- self.updateThread.enqueue(up)
-
- def controlCallback(self, conn, type, data):
- up = update.ControlUpdate(conn, type, data)
- self.updateThread.enqueue(up)
-
- def registerCallback(self, callback):
+ def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
self.lock()
try:
- self.currentMethodId += 1
- methodId = self.currentMethodId
- self.outstandingMethodCalls[methodId] = callback
- return methodId
+ broker = self.managedBrokers[managedBroker]
finally:
self.unlock()
- def getConnectionByRegistration(self, reg):
+ pname, cname, hash = classKey.split(", ")
+ seq = self.mgmtSession._sendMethodRequest(broker, (pname, cname, hash), objId, methodName, args)
+
+ if seq is not None:
+ self.lock()
+ try:
+ self.outstandingMethodCalls[seq] = callback
+ finally:
+ self.unlock()
+ return seq
+
+ def brokerConnected(self, broker):
+ """ Invoked when a connection is established to a broker """
self.lock()
try:
- return self.connections.get("%s:%i" % (reg.host, reg.port))
+ self.managedBrokers[str(broker.getBrokerId())] = broker
finally:
self.unlock()
-class MintDatabase(object):
- def __init__(self, uri):
- self.uri = uri
-
- def getConnection(self):
- return connectionForURI(self.uri).getConnection()
-
- def check(self):
- self.checkConnection()
-
- def init(self):
- conn = connectionForURI(self.uri)
- sqlhub.processConnection = conn
-
- def checkConnection(self):
- conn = self.getConnection()
-
+ def brokerDisconnected(self, broker):
+ """ Invoked when the connection to a broker is lost """
+ self.lock()
try:
- cursor = conn.cursor()
- cursor.execute("select now()")
+ del self.managedBrokers[str(broker.getBrokerId())]
finally:
- conn.close()
+ self.unlock()
+ if (self.connCloseListener != None):
+ self.connCloseListener(broker)
- def checkSchema(self):
+ def newPackage(self, name):
+ """ Invoked when a QMF package is discovered. """
pass
- def dropSchema(self):
- conn = self.getConnection()
- try:
- cursor = conn.cursor()
-
- cursor.execute("drop schema public cascade")
+ def newClass(self, kind, classKey):
+ """ Invoked when a new class is discovered. Session.getSchema can be
+ used to obtain details about the class."""
+ pass
- conn.commit()
- finally:
- conn.close()
+ def newAgent(self, agent):
+ """ Invoked when a QMF agent is discovered. """
+ pass
- def createSchema(self, file_paths):
- conn = self.getConnection()
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
- scripts = list()
+ def objectProps(self, broker, record):
+ """ Invoked when an object is updated. """
+ self.updateThread.enqueue(update.PropertyUpdate(broker, record))
- for path in file_paths:
- file = open(path, "r")
- try:
- scripts.append((path, file.read()))
- finally:
- file.close()
+ def objectStats(self, broker, record):
+ """ Invoked when an object is updated. """
+ self.updateThread.enqueue(update.StatisticUpdate(broker, record))
- try:
- cursor = conn.cursor()
+ def event(self, broker, event):
+ """ Invoked when an event is raised. """
+ pass
- try:
- cursor.execute("create schema public");
- except:
- conn.rollback()
- pass
+ def heartbeat(self, agent, timestamp):
+ pass
- for path, text in scripts:
- stmts = text.split(";")
- count = 0
-
- for stmt in stmts:
- stmt = stmt.strip()
-
- if stmt:
- try:
- cursor.execute(stmt)
- except Exception, e:
- print "Failed executing statement:"
- print stmt
-
- raise e
-
- count += 1
-
- print "Executed %i statements from file '%s'" % (count, path)
-
- conn.commit()
-
- info = MintInfo(version="0.1")
- info.sync()
-
- # Standard roles
-
- user = Role(name="user")
- user.sync()
-
- admin = Role(name="admin")
- admin.sync()
- finally:
- conn.close()
-
- def checkSchema(self):
- conn = self.getConnection()
-
+ def brokerInfo(self, broker):
+ self.lock()
try:
- cursor = conn.cursor()
-
- try:
- cursor.execute("select version from mint_info");
- except Exception, e:
- print "No schema present"
- return
-
- for rec in cursor:
- print "OK (version %s)" % rec[0]
- return;
-
- print "No schema present"
+ self.managedBrokers[str(broker.getBrokerId())] = broker
finally:
- conn.close()
+ self.unlock()
+
+ def methodResponse(self, broker, seq, response):
+ self.updateThread.enqueue(update.MethodUpdate(broker, seq, response))
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,7 +1,7 @@
import mint
from sqlobject import *
from datetime import datetime
-from qpid.management import objectId
+from qpid.qmfconsole import ObjectId
class Pool(SQLObject):
class sqlmeta:
@@ -16,6 +16,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -135,6 +136,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -172,61 +174,49 @@
def GetAd(self, model, callback, JobAd):
- actualArgs = dict()
- actualArgs["JobAd"] = JobAd
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "GetAd",
+ actualArgs = list()
+ actualArgs.append(JobAd)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetAd",
callback, args=actualArgs)
def SetAttribute(self, model, callback, Name, Value):
- actualArgs = dict()
- actualArgs["Name"] = Name
- actualArgs["Value"] = Value
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "SetAttribute",
+ actualArgs = list()
+ actualArgs.append(Name)
+ actualArgs.append(Value)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetAttribute",
callback, args=actualArgs)
def Hold(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Hold",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Hold",
callback, args=actualArgs)
def Release(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Release",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Release",
callback, args=actualArgs)
def Remove(self, model, callback, Reason):
- actualArgs = dict()
- actualArgs["Reason"] = Reason
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Remove",
+ actualArgs = list()
+ actualArgs.append(Reason)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Remove",
callback, args=actualArgs)
def Fetch(self, model, callback, File, Start, End, Data):
- actualArgs = dict()
- actualArgs["File"] = File
- actualArgs["Start"] = Start
- actualArgs["End"] = End
- actualArgs["Data"] = Data
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Fetch",
+ actualArgs = list()
+ actualArgs.append(File)
+ actualArgs.append(Start)
+ actualArgs.append(End)
+ actualArgs.append(Data)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Fetch",
callback, args=actualArgs)
class JobStats(SQLObject):
@@ -247,6 +237,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -298,6 +289,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -332,6 +324,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -350,30 +343,24 @@
def GetLimits(self, model, callback, Limits):
- actualArgs = dict()
- actualArgs["Limits"] = Limits
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "GetLimits",
+ actualArgs = list()
+ actualArgs.append(Limits)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetLimits",
callback, args=actualArgs)
def SetLimit(self, model, callback, Name, Max):
- actualArgs = dict()
- actualArgs["Name"] = Name
- actualArgs["Max"] = Max
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "SetLimit",
+ actualArgs = list()
+ actualArgs.append(Name)
+ actualArgs.append(Max)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetLimit",
callback, args=actualArgs)
def Reconfig(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Reconfig",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Reconfig",
callback, args=actualArgs)
class NegotiatorStats(SQLObject):
@@ -401,6 +388,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -433,6 +421,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -453,21 +442,17 @@
def Start(self, model, callback, Subsystem):
- actualArgs = dict()
- actualArgs["Subsystem"] = Subsystem
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Start",
+ actualArgs = list()
+ actualArgs.append(Subsystem)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Start",
callback, args=actualArgs)
def Stop(self, model, callback, Subsystem):
- actualArgs = dict()
- actualArgs["Subsystem"] = Subsystem
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "Stop",
+ actualArgs = list()
+ actualArgs.append(Subsystem)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Stop",
callback, args=actualArgs)
class MasterStats(SQLObject):
@@ -495,6 +480,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -510,11 +496,9 @@
def reloadACLFile(self, model, callback):
"""Reload the ACL file"""
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "reloadACLFile",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "reloadACLFile",
callback, args=actualArgs)
class AclStats(SQLObject):
@@ -536,6 +520,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -552,19 +537,15 @@
def stopClusterNode(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "stopClusterNode",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "stopClusterNode",
callback, args=actualArgs)
def stopFullCluster(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "stopFullCluster",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "stopFullCluster",
callback, args=actualArgs)
class ClusterStats(SQLObject):
@@ -585,6 +566,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -631,6 +613,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -652,12 +635,10 @@
def expand(self, model, callback, by):
"""Increase number of files allocated for this journal"""
- actualArgs = dict()
- actualArgs["by"] = by
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "expand",
+ actualArgs = list()
+ actualArgs.append(by)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "expand",
callback, args=actualArgs)
class JournalStats(SQLObject):
@@ -706,6 +687,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -738,6 +720,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -757,41 +740,35 @@
def echo(self, model, callback, sequence, body):
"""Request a response to test the path to the management broker"""
- actualArgs = dict()
- actualArgs["sequence"] = sequence
- actualArgs["body"] = body
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "echo",
+ actualArgs = list()
+ actualArgs.append(sequence)
+ actualArgs.append(body)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "echo",
callback, args=actualArgs)
def connect(self, model, callback, host, port, durable, authMechanism, username, password, transport):
"""Establish a connection to another broker"""
- actualArgs = dict()
- actualArgs["host"] = host
- actualArgs["port"] = port
- actualArgs["durable"] = durable
- actualArgs["authMechanism"] = authMechanism
- actualArgs["username"] = username
- actualArgs["password"] = password
- actualArgs["transport"] = transport
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "connect",
+ actualArgs = list()
+ actualArgs.append(host)
+ actualArgs.append(port)
+ actualArgs.append(durable)
+ actualArgs.append(authMechanism)
+ actualArgs.append(username)
+ actualArgs.append(password)
+ actualArgs.append(transport)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "connect",
callback, args=actualArgs)
def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
- actualArgs = dict()
- actualArgs["srcQueue"] = srcQueue
- actualArgs["destQueue"] = destQueue
- actualArgs["qty"] = qty
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "queueMoveMessages",
+ actualArgs = list()
+ actualArgs.append(srcQueue)
+ actualArgs.append(destQueue)
+ actualArgs.append(qty)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "queueMoveMessages",
callback, args=actualArgs)
class BrokerStats(SQLObject):
@@ -812,6 +789,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -844,6 +822,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -873,6 +852,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -889,12 +869,10 @@
def purge(self, model, callback, request):
"""Discard all or some messages on a queue"""
- actualArgs = dict()
- actualArgs["request"] = request
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "purge",
+ actualArgs = list()
+ actualArgs.append(request)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "purge",
callback, args=actualArgs)
class QueueStats(SQLObject):
@@ -942,6 +920,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -985,6 +964,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1017,6 +997,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1032,11 +1013,9 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
callback, args=actualArgs)
class ClientConnectionStats(SQLObject):
@@ -1062,6 +1041,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1076,29 +1056,25 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
callback, args=actualArgs)
def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic):
"""Bridge messages over the link"""
- actualArgs = dict()
- actualArgs["durable"] = durable
- actualArgs["src"] = src
- actualArgs["dest"] = dest
- actualArgs["key"] = key
- actualArgs["tag"] = tag
- actualArgs["excludes"] = excludes
- actualArgs["srcIsQueue"] = srcIsQueue
- actualArgs["srcIsLocal"] = srcIsLocal
- actualArgs["dynamic"] = dynamic
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "bridge",
+ actualArgs = list()
+ actualArgs.append(durable)
+ actualArgs.append(src)
+ actualArgs.append(dest)
+ actualArgs.append(key)
+ actualArgs.append(tag)
+ actualArgs.append(excludes)
+ actualArgs.append(srcIsQueue)
+ actualArgs.append(srcIsLocal)
+ actualArgs.append(dynamic)
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "bridge",
callback, args=actualArgs)
class LinkStats(SQLObject):
@@ -1121,6 +1097,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1141,11 +1118,9 @@
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
callback, args=actualArgs)
class BridgeStats(SQLObject):
@@ -1166,6 +1141,7 @@
recTime = TimestampCol(default=None)
sourceScopeId = BigIntCol(default=None)
sourceObjectId = BigIntCol(default=None)
+ qmfClassKey = BLOBCol(default=None)
creationTime = TimestampCol(default=None)
deletionTime = TimestampCol(default=None)
managedBroker = StringCol(length=1000, default=None)
@@ -1182,35 +1158,27 @@
def solicitAck(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "solicitAck",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "solicitAck",
callback, args=actualArgs)
def detach(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "detach",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "detach",
callback, args=actualArgs)
def resetLifespan(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "resetLifespan",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "resetLifespan",
callback, args=actualArgs)
def close(self, model, callback):
- actualArgs = dict()
- conn = model.connections[self.managedBroker]
- classInfo = self.classInfos[self.managedBroker]
- originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)
- conn.callMethod(originalId, classInfo, "close",
+ actualArgs = list()
+ originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
+ model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "close",
callback, args=actualArgs)
class SessionStats(SQLObject):
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -139,6 +139,7 @@
else:
self.generateAttrib("sourceScopeId", "BigIntCol")
self.generateAttrib("sourceObjectId", "BigIntCol")
+ self.generateAttrib("qmfClassKey", "BLOBCol")
self.generateTimestampAttrib("creation")
self.generateTimestampAttrib("deletion")
self.generateAttrib("managedBroker", "StringCol", "length=1000")
@@ -154,10 +155,11 @@
else:
comment = ""
formalArgs = ", "
- actualArgs = " actualArgs = dict()\n"
+ actualArgs = " actualArgs = list()\n"
for arg in elem.query["arg"]:
formalArgs += "%s, " % (arg["@name"])
- actualArgs += " actualArgs[\"%s\"] = %s\n" % (arg["@name"], arg["@name"])
+ actualArgs += " actualArgs.append(%s)\n" % (arg["@name"])
+
if (formalArgs != ", "):
formalArgs = formalArgs[:-2]
else:
@@ -165,10 +167,8 @@
self.pythonOutput += "\n def %s(self, model, callback%s):\n" % (elem["@name"], formalArgs)
self.pythonOutput += comment
self.pythonOutput += actualArgs
- self.pythonOutput += " conn = model.connections[self.managedBroker]\n"
- self.pythonOutput += " classInfo = self.classInfos[self.managedBroker]\n"
- self.pythonOutput += " originalId = objectId(None, self.sourceScopeId, self.sourceObjectId)\n"
- self.pythonOutput += " conn.callMethod(originalId, classInfo, \"%s\",\n" % elem["@name"]
+ self.pythonOutput += " originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)\n"
+ self.pythonOutput += " model.callMethod(self.managedBroker, originalId, self.qmfClassKey, \"%s\",\n" % elem["@name"]
self.pythonOutput += " callback, args=actualArgs)\n"
def endClass(self):
@@ -183,7 +183,7 @@
self.pythonOutput += "import mint\n"
self.pythonOutput += "from sqlobject import *\n"
self.pythonOutput += "from datetime import datetime\n"
- self.pythonOutput += "from qpid.management import objectId\n\n"
+ self.pythonOutput += "from qpid.qmfconsole import ObjectId\n\n"
self.pythonOutput += "class Pool(SQLObject):\n"
self.pythonOutput += " class sqlmeta:\n"
self.pythonOutput += " lazyUpdate = True\n"
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -1,16 +1,13 @@
import logging
-
+import datetime
+import types
+import struct
from Queue import Queue as ConcurrentQueue, Full, Empty
from threading import Thread
-from datetime import datetime
-from qpid.management import managementClient
-from struct import unpack
-from schema import schemaReservedWordsMap
-from sqlobject import DateTimeCol, TimestampCol, Col
+from traceback import print_exc
+from qpid.datatypes import UUID
+from mint.schema import *
-import types
-import mint
-
log = logging.getLogger("mint.update")
def time_unwarp(t):
@@ -33,14 +30,10 @@
self.updates.put(update)
except Full:
log.exception("Queue is full")
+ pass
def run(self):
while True:
- #size = self.updates.qsize()
- #
- #if size > 1:
- # log.debug("Queue depth is %i", self.updates.qsize())
-
try:
update = self.updates.get(True, 1)
except Empty:
@@ -54,303 +47,202 @@
update.process(self.model)
except:
log.exception("Update failed")
+ pass
def stop(self):
self.stopRequested = True
-class UnknownClassException(Exception):
- pass
+class ModelUpdate(object):
+ def __init__(self, broker, obj):
+ self.broker = broker
+ self.qmfObj = obj
-def unmarshalClassInfo(classInfo):
- package = classInfo[0]
- name = classInfo[1].capitalize()
+ def getStatsClass(self, cls):
+ return getattr(mint, cls.__name__ + "Stats")
- if name == "Connection":
- name = "ClientConnection"
+ def process(self):
+ pass
- cls = getattr(mint, name)
+ def processAttributes(self, attrs, cls, model):
+ # translate keys into their string representation
+ for key in attrs.keys():
+ attrs[key.__repr__()] = attrs.pop(key)
- if cls is None:
- raise UnknownClassException("Class '%s' is unknown" % name)
+ orphan = False
+ for name in attrs.keys():
+ rename = ""
+ orig_name = name
+ if name in mint.schema.schemaReservedWordsMap:
+ rename = mint.schema.schemaReservedWordsMap.get(name)
+ attrs[rename] = attrs.pop(name)
+ name = rename
- return package, cls
+ if len(name) > 3 and name.endswith("Ref"):
+ # Navigate to referenced objects
+ clsname = name[0].upper() + name[1:-3]
+ id = attrs.pop(name)
-def processAttributes(conn, attrs, cls, model, updateObj):
- attrs.pop("id")
+ othercls = getattr(mint, clsname, None)
- if "connectionRef" in attrs:
- attrs["clientConnectionRef"] = attrs.pop("connectionRef")
+ if othercls:
+ attrname = name[0:-3]
- orphan = False
- for name in attrs.keys():
- rename = schemaReservedWordsMap.get(name)
+ try:
+ attrs[attrname] = model.getObject(othercls, id, self.broker)
+ except KeyError:
+ log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
+ except mint.ObjectNotFound:
+ if not orphan:
+ log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (clsname, id))
+ # store object in orphan map, will be picked up later when parent info is received
+ if (clsname, id.first, id.second) not in model.orphanObjectMap:
+ model.orphanObjectMap[(clsname, id.first, id.second)] = set()
+ model.orphanObjectMap[(clsname, id.first, id.second)].add(self)
+ orphan = True
+ else:
+ log.error("Class '%s' not found" % clsname)
+ elif not hasattr(cls, orig_name):
+ # Remove attrs that we don't have in our schema
+ log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+ del attrs[name]
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
+ elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart",
+ "LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint",
+ "MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
+ and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
+ attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+ elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
+ attrs[name] = datetime.fromtimestamp(attrs[name])
+ #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
+ elif isinstance(attrs[name], UUID):
+ # convert UUIDs into their string representation, to be handled by sqlobject
+ attrs[name] = str(attrs[name])
+ if orphan:
+ return None
+ else:
+ return attrs
- if rename:
- attrs[rename] = attrs.pop(name)
- name = rename
- if len(name) > 3 and name.endswith("Ref"):
- # Navigate to referenced objects
+class PropertyUpdate(ModelUpdate):
+ def __init__(self, broker, obj):
+ ModelUpdate.__init__(self, broker, obj)
- clsname = name[0].upper() + name[1:-3]
- id = attrs.pop(name)
-
- othercls = getattr(mint, clsname, None)
-
- if othercls:
- attrname = name[0:-3]
-
- try:
- attrs[attrname] = conn.getObject(othercls, id)
- except KeyError:
- log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
- except mint.ObjectNotFound:
- if not orphan:
- log.info("Referenced object %s '%s' not found, deferring creation of orphan object" % (clsname, id))
- # store object in orphan map, will be picked up later when parent info is received
- if (clsname, id.first, id.second) not in model.orphanObjectMap:
- model.orphanObjectMap[(clsname, id.first, id.second)] = set()
- model.orphanObjectMap[(clsname, id.first, id.second)].add(updateObj)
- orphan = True
- else:
- log.error("Class '%s' not found" % clsname)
- elif not hasattr(cls, name):
- # Remove attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
- del attrs[name]
- #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
- elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart",
- "LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint",
- "MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
- and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
- attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
- elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
- attrs[name] = datetime.fromtimestamp(attrs[name])
- #XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
- if orphan:
- return None
- else:
- return attrs
-
-def getStatsClass(cls):
- return getattr(mint, cls.__name__ + "Stats")
-
-class SchemaUpdate(object):
- def __init__(self, conn, classInfo, props, stats, methods, events):
- self.conn = conn
- self.classInfo = classInfo
- self.props = props
- self.stats = stats
- self.methods = methods
- self.events = events
-
def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- log.info("Processing %-8s %-16s %-16s" % ("schema", self.conn.id, cls))
-
try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- cls.classInfos[self.conn.id] = self.classInfo
- except UnknownClassException, e:
- log.warn(e)
- return
+ cls = self.qmfObj.getSchema().getKey()[1]
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = eval(cls[0].upper()+cls[1:])
+ attrs = dict(self.qmfObj.getProperties())
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- # XXX do more schema checking
+ if self.processAttributes(attrs, cls, model) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
-class PropertyUpdate(object):
- def __init__(self, conn, classInfo, props, timestamps):
- self.conn = conn
- self.classInfo = classInfo
- self.props = props
- self.timestamps = timestamps
+ obj = None
+ try:
+ obj = model.getObject(cls, id, self.broker)
+ except mint.ObjectNotFound:
+ obj = cls()
+ log.debug("%s(%i) created", cls.__name__, obj.id)
+ attrs["sourceScopeId"] = id.first
+ attrs["sourceObjectId"] = id.second
+ pkg, cls, hash = self.qmfObj.getClassKey()
+ attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
+ except Exception, e:
+ print e
- def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- args = ("props", self.conn.id, cls, len(self.props))
- log.info("Processing %-8s %-16s %-16s %3i" % args)
+ attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
+ attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
+ if timestamps[2] != 0:
+ attrs["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
+ log.debug("%s(%i) marked deleted", cls, obj.id)
- try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- except UnknownClassException, e:
- log.warn(e)
- return
+ obj.set(**attrs)
+ obj.syncUpdate()
- attrs = dict(self.props)
+ if (cls, id.first, id.second) in model.orphanObjectMap:
+ # this object is the parent of orphan objects in the map, re-enqueue for insertion
+ orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
+ for orphanObj in orphanObjects:
+ model.updateThread.enqueue(orphanObj)
+ log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
- id = attrs["id"]
- if processAttributes(self.conn, attrs, cls, model, self) == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ if cls == "broker":
+ if str(self.broker.getBrokerId()) in model.managedBrokers:
+ model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
+ reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
+ reg.broker = obj
+ reg.syncUpdate()
- # XXX move these down to the try/except
+ except:
+ print_exc()
- attrs["managedBroker"] = self.conn.id
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (self.timestamps[0]/1000000000))
+ (timestamps[0]/1000000000))
attrs["creationTime"] = datetime.fromtimestamp \
- (self.timestamps[1]/1000000000)
+ (timestamps[1]/1000000000)
- if self.timestamps[2] != 0:
- attrs["deletionTime"] = datetime.fromtimestamp \
- (self.timestamps[2]/1000000000)
- try:
- obj = self.conn.getObject(cls, id)
- except mint.ObjectNotFound:
- obj = cls()
+class StatisticUpdate(ModelUpdate):
+ def __init__(self, broker, obj):
+ ModelUpdate.__init__(self, broker, obj)
- log.debug("%s(%i) created", cls.__name__, obj.id)
-
- attrs["sourceScopeId"] = id.first
- attrs["sourceObjectId"] = id.second
-
- #hash = "%08x%08x%08x%08x" % unpack("!LLLL", self.classInfo[2])
- #classInfo = (self.classInfo[0], self.classInfo[1], hash)
- #obj.sourceClassInfo = ",".join(classInfo)
-
- obj.set(**attrs)
- obj.syncUpdate()
-
- if obj.deletionTime:
- log.debug("%s(%i) marked deleted", cls.__name__, obj.id)
-
- if (cls.__name__, id.first, id.second) in model.orphanObjectMap:
- # this object is the parent of orphan objects in the map, re-enqueue for insertion
- orphanObjects = model.orphanObjectMap.pop((cls.__name__, id.first, id.second))
- for orphanObj in orphanObjects:
- model.updateThread.enqueue(orphanObj)
- log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
-
- # XXX refactor this to take advantage of the get/create logic
- # above
- if isinstance(obj, mint.Broker) and obj.managedBroker:
- host, port = obj.managedBroker.split(":")
- port = int(port)
-
- if not obj.registration:
- try:
- reg = mint.BrokerRegistration.selectBy(host=host, port=port)[0]
- except IndexError:
- reg = None
-
- if reg:
- reg.broker = obj
- obj.registration = reg
-
- reg.syncUpdate()
- obj.syncUpdate()
-
-class StatisticUpdate(object):
- def __init__(self, conn, classInfo, stats, timestamps):
- self.conn = conn
- self.classInfo = classInfo
- self.stats = stats
- self.timestamps = timestamps
-
def process(self, model):
- cls = "%s.%s" % (self.classInfo[0], self.classInfo[1])
- args = ("stats", self.conn.id, cls, len(self.stats))
- log.info("Processing %-8s %-16s %-16s %3i" % args)
-
try:
- pkg, cls = unmarshalClassInfo(self.classInfo)
- except UnknownClassException, e:
- log.warn(e)
- return
+ cls = self.qmfObj.getSchema().getKey()[1]
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = eval(cls[0].upper()+cls[1:])
+ attrs = dict(self.qmfObj.getStatistics())
+ timestamps = self.qmfObj.getTimestamps()
+ id = self.qmfObj.getObjectId()
- attrs = dict(self.stats)
+ obj = None
+ try:
+ obj = model.getObject(cls, id, self.broker)
+ except mint.ObjectNotFound:
+ # handle this
+ raise mint.ObjectNotFound
- id = attrs["id"]
- obj = self.conn.getObject(cls, id)
- statscls = getStatsClass(cls)
- if processAttributes(self.conn, attrs, statscls, model, self) == None:
- # object is orphan, a parent dependency was not found;
- # insertion in db is deferred until parent info is received
- return
+ statscls = self.getStatsClass(cls)
+ if self.processAttributes(attrs, statscls, model) == None:
+ # object is orphan, a parent dependency was not found;
+ # insertion in db is deferred until parent info is received
+ return
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (self.timestamps[0]/1000000000))
- # Set the stat->obj reference
- attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+ attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
+ (timestamps[0]/1000000000))
- statsobj = statscls()
- statsobj.set(**attrs)
- statsobj.syncUpdate()
+ # Set the stat->obj reference
+ attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
+ statsobj = statscls()
+ statsobj.set(**attrs)
+ statsobj.syncUpdate()
- # XXX not sure if this should happen here. makes more sense in
- # prop update
- if self.timestamps[2] != 0:
- obj.deletionTime = datetime.fromtimestamp(self.timestamps[2]/1000000000)
+ obj.statsPrev = obj.statsCurr
+ obj.statsCurr = statsobj
+ obj.syncUpdate()
- obj.statsPrev = obj.statsCurr
- obj.statsCurr = statsobj
- obj.syncUpdate()
+ except:
+ print_exc()
-class MethodUpdate(object):
- def __init__(self, conn, methodId, errorCode, errorText, args):
- self.conn = conn
- self.methodId = methodId
- self.errorCode = errorCode
- self.errorText = errorText
- self.args = args
- def process(self, model):
- logArgs = ("method", self.conn.id, self.methodId, self.errorCode,
- self.errorText)
- log.info("Processing %-8s %-16s %-12s %-12s %s" % logArgs)
+class MethodUpdate(ModelUpdate):
+ def __init__(self, broker, seq, response):
+ ModelUpdate.__init__(self, broker, response)
+ self.seq = seq
- model.lock()
- try:
- method = model.outstandingMethodCalls.pop(self.methodId)
- method(self.errorText, self.args)
- finally:
- model.unlock()
-
-class CloseUpdate(object):
- def __init__(self, conn, data):
- self.conn = conn
- self.data = data
-
def process(self, model):
- log.info("Processing %-8s %-16s" % ("close", self.conn.id))
-
model.lock()
try:
- del model.connections[self.conn.id]
-
- if model.connCloseListener:
- model.connCloseListener(self.conn, self.data)
+ methodCallback = model.outstandingMethodCalls.pop(self.seq)
+ methodCallback(self.qmfObj.text, self.qmfObj.outArgs)
finally:
model.unlock()
-
-class ControlUpdate(object):
- __types = {
- managementClient.CTRL_BROKER_INFO: "broker_info",
- managementClient.CTRL_SCHEMA_LOADED: "schema_loaded",
- managementClient.CTRL_USER: "user",
- managementClient.CTRL_HEARTBEAT: "heartbeat"
- }
-
- def __init__(self, conn, typeCode, data):
- self.conn = conn
- self.typeCode = typeCode
- self.data = data
-
- def process(self, model):
- type = self.__types.get(self.typeCode, "[unknown]")
- args = ("control", self.conn.id, type, self.data)
- log.info("Processing %-8s %-16s %-16s %s" % args)
-
- if self.typeCode == managementClient.CTRL_BROKER_INFO:
- uuid = "%08x-%04x-%04x-%04x-%04x%08x" % unpack \
- ("!LHHHHL", self.data.brokerId)
-
- log.info("Broker ID from %s is '%s'" % (self.conn.id, uuid))
- log.info("Session ID from %s is '%s'" % \
- (self.conn.id, self.data.sessionId))
-
- self.conn.brokerId = uuid
- self.conn.sessionId = self.data.sessionId
-
Modified: mgmt/trunk/mint/sql/schema.sql
===================================================================
--- mgmt/trunk/mint/sql/schema.sql 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/mint/sql/schema.sql 2008-11-14 20:12:00 UTC (rev 2805)
@@ -23,13 +23,12 @@
CREATE TABLE broker_registration (
id SERIAL PRIMARY KEY,
name VARCHAR(1000) NOT NULL UNIQUE,
- host VARCHAR(1000) NOT NULL,
- port INT NOT NULL,
+ url VARCHAR(1000),
broker_id INT,
cluster_id INT,
profile_id INT
);
-CREATE UNIQUE INDEX broker_registration_host_port_unique ON broker_registration (host, port);
+CREATE UNIQUE INDEX broker_registration_url_unique ON broker_registration (url);
CREATE TABLE collector_registration (
id SERIAL PRIMARY KEY,
@@ -37,13 +36,6 @@
collector_id VARCHAR(1000)
);
-CREATE TABLE config_property (
- id SERIAL PRIMARY KEY,
- name VARCHAR(1000),
- value VARCHAR(1000),
- type VARCHAR(1)
-);
-
CREATE TABLE mint_info (
id SERIAL PRIMARY KEY,
version VARCHAR(1000) NOT NULL
@@ -75,6 +67,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -99,6 +92,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -123,6 +117,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -147,6 +142,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -176,6 +172,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -204,6 +201,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -233,6 +231,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -258,6 +257,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -282,6 +282,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -317,6 +318,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -363,6 +365,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -420,6 +423,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -445,6 +449,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -479,6 +484,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -516,6 +522,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -567,6 +574,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -608,6 +616,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -638,6 +647,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -747,6 +757,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -785,6 +796,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -811,6 +823,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
@@ -835,6 +848,7 @@
rec_time TIMESTAMP,
source_scope_id BIGINT,
source_object_id BIGINT,
+ qmf_class_key BYTEA,
creation_time TIMESTAMP,
deletion_time TIMESTAMP,
managed_broker VARCHAR(1000),
Modified: mgmt/trunk/parsley/python/parsley/command.py
===================================================================
--- mgmt/trunk/parsley/python/parsley/command.py 2008-11-14 19:30:06 UTC (rev 2804)
+++ mgmt/trunk/parsley/python/parsley/command.py 2008-11-14 20:12:00 UTC (rev 2805)
@@ -12,6 +12,9 @@
self.commands = list()
self.commands_by_name = dict()
+ opt = CommandOption(self, "help", "h")
+ opt.description = "Print this message"
+
if self.parent:
self.parent.commands.append(self)
self.parent.commands_by_name[self.name] = self
16 years, 2 months
rhmessaging commits: r2804 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-11-14 14:30:06 -0500 (Fri, 14 Nov 2008)
New Revision: 2804
Modified:
store/trunk/cpp/lib/jrnl/jcntl.cpp
Log:
Fix for BZ470228 - "Abort on starting journal with changed sizing params"
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-11-14 17:52:18 UTC (rev 2803)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2008-11-14 19:30:06 UTC (rev 2804)
@@ -142,12 +142,8 @@
_lfmgr.finalize();
- // Set new file geometry parameters
assert(num_jfiles >= JRNL_MIN_NUM_FILES);
assert(num_jfiles <= JRNL_MAX_NUM_FILES);
- _emap.set_num_jfiles(num_jfiles);
- _tmap.set_num_jfiles(num_jfiles);
-
assert(jfsize_sblks >= JRNL_MIN_FILE_SIZE);
assert(jfsize_sblks <= JRNL_MAX_FILE_SIZE);
_jfsize_sblks = jfsize_sblks;
@@ -559,6 +555,8 @@
rd._njf = ji.num_jfiles();
_rcvdat._enq_cnt_list.resize(rd._njf);
}
+ _emap.set_num_jfiles(rd._njf);
+ _tmap.set_num_jfiles(rd._njf);
if (_jfsize_sblks != ji.jfsize_sblks())
{
std::ostringstream oss;
16 years, 2 months
rhmessaging commits: r2803 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-11-14 12:52:18 -0500 (Fri, 14 Nov 2008)
New Revision: 2803
Modified:
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/jrnl/jinf.cpp
Log:
Fix for BZ471612 - "Journal auto expand options and status visible when not yet enabled in source"
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-14 17:33:09 UTC (rev 2802)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2008-11-14 17:52:18 UTC (rev 2803)
@@ -326,8 +326,9 @@
isInit = true;
QPID_LOG(notice, "Store module initialized; dir=" << dir);
QPID_LOG(info, "> Default files per journal: " << jfiles);
- QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
- if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
+// TODO: Uncomment these lines when auto-expand is enabled.
+// QPID_LOG(info, "> Auto-expand " << (autoJrnlExpand ? "enabled" : "disabled"));
+// if (autoJrnlExpand) QPID_LOG(info, "> Max auto-expand journal files: " << autoJrnlExpandMaxFiles);
QPID_LOG(info, "> Default journal file size: " << jfileSizePgs << " (wpgs)");
QPID_LOG(info, "> Default write cache page size: " << wCachePageSizeKib << " (Kib)");
QPID_LOG(info, "> Default number of write cache pages: " << wCacheNumPages);
@@ -1759,11 +1760,12 @@
"Required if --no-data-dir is also used.")
("num-jfiles", qpid::optValue(numJrnlFiles, "N"),
"Default number of files for each journal instance (queue).")
- ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"),
- "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. "
- "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
- ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
- "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
+// TODO: Uncomment these lines when auto-expand is enabled.
+// ("auto-expand", qpid::optValue(autoJrnlExpand, "yes|no"),
+// "If yes|true|1, allows journal to auto-expand by adding additional journal files as needed. "
+// "If no|false|0, the number of journal files will remain fixed (num-jfiles).")
+// ("max-auto-expand-jfiles", qpid::optValue(autoJrnlExpandMaxFiles, "N"),
+// "Maximum number of journal files allowed from auto-expanding; must be greater than --num-jfiles parameter.")
("jfile-size-pgs", qpid::optValue(jrnlFsizePgs, "N"),
"Default size for each journal file in multiples of read pages (1 read page = 64kiB)")
("wcache-page-size", qpid::optValue(wCachePageSizeKib, "N"),
Modified: store/trunk/cpp/lib/jrnl/jinf.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jinf.cpp 2008-11-14 17:33:09 UTC (rev 2802)
+++ store/trunk/cpp/lib/jrnl/jinf.cpp 2008-11-14 17:52:18 UTC (rev 2803)
@@ -303,8 +303,9 @@
oss << " Journal base filename: \"" << _base_filename << "\"" << std::endl;
oss << " Journal version: " << (unsigned)_jver << std::endl;
oss << " Number of journal files: " << _num_jfiles << std::endl;
- oss << " Auto-expand mode: " << (_ae ? "enabled" : "disabled") << std::endl;
- if (_ae) oss << " Max. number of journal files (in auto-expand mode): " << _ae_max_jfiles << std::endl;
+// TODO: Uncomment these lines when auto-expand is enabled.
+// oss << " Auto-expand mode: " << (_ae ? "enabled" : "disabled") << std::endl;
+// if (_ae) oss << " Max. number of journal files (in auto-expand mode): " << _ae_max_jfiles << std::endl;
oss << " Journal file size: " << _jfsize_sblks << " sblks" << std::endl;
oss << " Softblock size (JRNL_SBLK_SIZE): " << _sblk_size_dblks << " dblks" << std::endl;
oss << " Datablock size (JRNL_DBLK_SIZE): " << _dblk_size << " bytes" << std::endl;
16 years, 2 months
rhmessaging commits: r2802 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-14 12:33:09 -0500 (Fri, 14 Nov 2008)
New Revision: 2802
Modified:
mgmt/trunk/cumin/python/cumin/parameters.py
Log:
Changed the do_marshal for CuminClassParameters. This was preventing model updates after a session timeout.
Modified: mgmt/trunk/cumin/python/cumin/parameters.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/parameters.py 2008-11-14 16:23:43 UTC (rev 2801)
+++ mgmt/trunk/cumin/python/cumin/parameters.py 2008-11-14 17:33:09 UTC (rev 2802)
@@ -17,7 +17,7 @@
return getattr(self.app.model, string, None)
def do_marshal(self, cls):
- return cls.name
+ return cls.cumin_name
class BrokerClusterParameter(Parameter):
def do_unmarshal(self, string):
16 years, 2 months
rhmessaging commits: r2801 - in store/trunk/cpp: lib/jrnl and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2008-11-14 11:23:43 -0500 (Fri, 14 Nov 2008)
New Revision: 2801
Modified:
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/jcntl.hpp
store/trunk/cpp/lib/jrnl/rmgr.cpp
store/trunk/cpp/tests/jrnl/_st_read.cpp
Log:
Fix for BZ471601 "Read tests may fail with RHM_IORES_EMPTY" and BZ471606 "TwoPhaseCommitTest fails intermittently with valgrind illegal read/write error"
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -25,7 +25,6 @@
#include "jrnl/jerrno.hpp"
#include "jrnl/jexception.hpp"
-#include "jrnl/slock.hpp"
#include "qpid/log/Statement.h"
#include "qpid/agent/ManagementAgent.h"
#include "qmf/com/redhat/rhm/store/ArgsJournalExpand.h"
@@ -40,12 +39,10 @@
qpid::broker::Timer* JournalImpl::journalTimerPtr = 0;
u_int32_t JournalImpl::cnt = 0;
-void InactivityFireEvent::fire() { if (parent) parent->flushFire(); }
-void GetEventsFireEvent::fire() {
- if (parent) parent->getEventsFire();
- release();
-}
+void InactivityFireEvent::fire() { slock s(&_ife_mutex); if (parent) parent->flushFire(); }
+void GetEventsFireEvent::fire() { slock s(&_gefe_mutex); if (parent) parent->getEventsFire(); }
+
JournalImpl::JournalImpl(const std::string& journalId,
const std::string& journalDirectory,
const std::string& journalBaseFilename,
@@ -453,9 +450,7 @@
{
slock s(&_getf_mutex);
getEventsTimerSetFlag = false;
- if (_wmgr.get_aio_evt_rem()) {
- jcntl::get_wr_events();
- }
+ if (_wmgr.get_aio_evt_rem()) { jcntl::get_wr_events(); }
if (_wmgr.get_aio_evt_rem()) { setGetEventTimer(); }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/JournalImpl.h 2008-11-14 16:23:43 UTC (rev 2801)
@@ -26,6 +26,7 @@
#include <set>
#include "jrnl/jcntl.hpp"
+#include "jrnl/slock.hpp"
#include "DataTokenImpl.h"
#include "PreparedTransaction.h"
#include <qpid/broker/Timer.h>
@@ -44,25 +45,27 @@
class InactivityFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
+ pthread_mutex_t _ife_mutex;
public:
InactivityFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTask(timeout), parent(p) {}
- virtual ~InactivityFireEvent() {}
+ qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_ife_mutex, 0); }
+ virtual ~InactivityFireEvent() { ::pthread_mutex_destroy(&_ife_mutex); }
void fire();
- inline void cancel() { parent=0; }
+ inline void cancel() { mrg::journal::slock s(&_ife_mutex); parent = 0; }
};
class GetEventsFireEvent : public virtual qpid::broker::TimerTask
{
JournalImpl* parent;
+ pthread_mutex_t _gefe_mutex;
public:
GetEventsFireEvent(JournalImpl* p, const qpid::sys::Duration timeout):
- qpid::broker::TimerTask(timeout), parent(p) {}
- virtual ~GetEventsFireEvent() {}
+ qpid::broker::TimerTask(timeout), parent(p) { ::pthread_mutex_init(&_gefe_mutex, 0); }
+ virtual ~GetEventsFireEvent() { ::pthread_mutex_destroy(&_gefe_mutex); }
void fire();
- inline void cancel() { parent=0; }
+ inline void cancel() { mrg::journal::slock s(&_gefe_mutex); parent = 0; }
};
class JournalImpl : public qpid::broker::ExternalQueueStore, public journal::jcntl
@@ -73,7 +76,7 @@
bool getEventsTimerSetFlag;
boost::intrusive_ptr<qpid::broker::TimerTask> getEventsFireEventsPtr;
- pthread_mutex_t _getf_mutex; // getEventsTimerSetFlag mutex
+ pthread_mutex_t _getf_mutex;
u_int64_t lastReadRid; // rid of last read msg for loadMsgContent()
@@ -197,7 +200,6 @@
inline void setGetEventTimer()
{
- getEventsFireEventsPtr->addRef();
assert(journalTimerPtr != 0);
journalTimerPtr->add(getEventsFireEventsPtr);
getEventsTimerSetFlag = true;
@@ -236,7 +238,7 @@
return JournalImpl::read_data_record(datapp, dsize, xidpp, xidsize, transient, external, dtokp, true);
}
inline void read_reset() { _rmgr.invalidate(); }
- };
+ }; // class TplJournalImpl
} // namespace msgstore
} // namespace mrg
Modified: store/trunk/cpp/lib/jrnl/jcntl.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/jcntl.hpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -110,6 +110,9 @@
* journal will no longer accept messages until either initialize() or recover() is called.
* There is no way other than through initialization to reset this flag.
*/
+ // TODO: It would be helpful to distinguish between states stopping and stopped. If stop(true) is called,
+ // then we are stopping, but must wait for all outstanding aios to return before being finally stopped. During
+ // this period, however, no new enqueue/dequeue/read requests may be accepted.
bool _stop_flag;
/**
Modified: store/trunk/cpp/lib/jrnl/rmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/lib/jrnl/rmgr.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -132,10 +132,13 @@
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if rd AIOs returned; initiate new reads if possible
- if (_jc->unflushed_dblks() > 0)
- _jc->flush();
- else if (!_aio_evt_rem)
- return RHM_IORES_EMPTY;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
}
if (_page_cb_arr[_pg_index]._state != AIO_COMPLETE)
{
@@ -359,10 +362,13 @@
if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
{
aio_cycle(); // check if any AIOs have returned
- if (_jc->unflushed_dblks() > 0)
- _jc->flush();
- else if (!_aio_evt_rem)
- return RHM_IORES_EMPTY;
+ if(dblks_rem() == 0 && _rrfc.is_compl() && !_rrfc.is_wr_aio_outstanding())
+ {
+ if (_jc->unflushed_dblks() > 0)
+ _jc->flush();
+ else if (!_aio_evt_rem)
+ return RHM_IORES_EMPTY;
+ }
}
// Check write state of this token is ENQ - required for read
Modified: store/trunk/cpp/tests/jrnl/_st_read.cpp
===================================================================
--- store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-11-13 15:56:34 UTC (rev 2800)
+++ store/trunk/cpp/tests/jrnl/_st_read.cpp 2008-11-14 16:23:43 UTC (rev 2801)
@@ -107,21 +107,24 @@
test_jrnl jc(test_name, test_dir, test_name);
jc.initialize(2*NUM_TEST_JFILES, false, 0, 10*TEST_JFSIZE_SBLKS);
- for (int m=0; m<NUM_MSGS*125; m++)
- enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
- jc.flush();
- for (int m=0; m<NUM_MSGS*125; m++)
+ for (int i=0; i<10; i++)
{
- read_msg(jc, rmsg, xid, transientFlag, externalFlag);
- BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
- BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
- BOOST_CHECK_EQUAL(transientFlag, false);
- BOOST_CHECK_EQUAL(externalFlag, false);
+ for (int m=0; m<NUM_MSGS*125; m++)
+ enq_msg(jc, m, create_msg(msg, m, 16*MSG_SIZE), false);
+ jc.flush();
+ for (int m=0; m<NUM_MSGS*125; m++)
+ {
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag);
+ BOOST_CHECK_EQUAL(create_msg(msg, m, 16*MSG_SIZE), rmsg);
+ BOOST_CHECK_EQUAL(xid.size(), std::size_t(0));
+ BOOST_CHECK_EQUAL(transientFlag, false);
+ BOOST_CHECK_EQUAL(externalFlag, false);
+ }
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
+ for (int m=0; m<NUM_MSGS*125; m++)
+ deq_msg(jc, m, m+NUM_MSGS*125);
+ read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
- read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
- for (int m=0; m<NUM_MSGS*125; m++)
- deq_msg(jc, m, m+NUM_MSGS*125);
- read_msg(jc, rmsg, xid, transientFlag, externalFlag, RHM_IORES_EMPTY);
}
catch(const exception& e) { BOOST_FAIL(e.what()); }
cout << "ok" << endl;
@@ -140,7 +143,7 @@
test_jrnl jc(test_name, test_dir, test_name);
jc.initialize(NUM_TEST_JFILES, false, 0, TEST_JFSIZE_SBLKS);
- for (int m=0; m<2*NUM_MSGS; m+=2)
+ for (int m=0; m<500*NUM_MSGS; m+=2)
{
enq_msg(jc, m, create_msg(msg, m, MSG_SIZE), false);
jc.flush();
16 years, 2 months
rhmessaging commits: r2800 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-13 10:56:34 -0500 (Thu, 13 Nov 2008)
New Revision: 2800
Modified:
mgmt/trunk/cumin/python/cumin/pool.py
Log:
Pass pool id to ajax update methods.
Modified: mgmt/trunk/cumin/python/cumin/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.py 2008-11-13 15:56:24 UTC (rev 2799)
+++ mgmt/trunk/cumin/python/cumin/pool.py 2008-11-13 15:56:34 UTC (rev 2800)
@@ -308,7 +308,7 @@
def get_url(self, session):
pool = self.parent.frame.get_args(session)[0]
- return "call.xml?class=pool;method=jobs"
+ return "call.xml?class=pool;id=%s;method=jobs" % pool.id
def get_sticky_info(self, session):
return [("name", "ID"), ("submitter", "Submitter"), ("status", "Status")]
@@ -343,7 +343,7 @@
def get_url(self, session):
pool = self.parent.frame.get_args(session)[0]
- return "call.xml?class=pool;method=slots"
+ return "call.xml?class=pool;id=%s;method=slots" % pool.id
def get_sticky_info(self, session):
return [("name", "Name"), ("machine", "Machine"), ("job", "Job")]
16 years, 2 months
rhmessaging commits: r2799 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-13 10:56:24 -0500 (Thu, 13 Nov 2008)
New Revision: 2799
Modified:
mgmt/trunk/cumin/python/cumin/model.py
Log:
Add classmethod "get(id)" to Pool object so it can function similar to SQLTable objects.
Add pool slot visualization. Needs to combined with system slot visualization eventually.
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-13 15:51:43 UTC (rev 2798)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-13 15:56:24 UTC (rev 2799)
@@ -495,8 +495,8 @@
stat.write_xml(writer, object)
def write_xml(self, writer, object):
- writer.write("<%s id=\"%i\" name=\"%s\">" % \
- (self.cumin_name, object.id,
+ writer.write("<%s id=\"%s\" name=\"%s\">" % \
+ (self.cumin_name, str(object.id),
self.get_object_name(object)))
self.write_event_xml(writer, object)
@@ -1840,7 +1840,14 @@
self.id = self.collector.Pool
self.name = self.collector.Name
+ def get(cls, id):
+ for coll in Collector.select("pool='%s'" % id):
+ return Pool(coll)
+ get = classmethod(get)
+
from job import JobSet
+from pool import PoolSlotSet
+
class CuminPool(CuminClass):
def __init__(self, model):
super(CuminPool, self).__init__(model, "pool", Pool)
@@ -1849,27 +1856,30 @@
prop.title = "Collector ID"
prop.summary = True
- stat = CuminStat(self, "Running")
+ stat = self.PercentStat(self, "Running")
stat.title = "Running Jobs"
- stat = CuminStat(self, "Completed")
+ stat = self.PercentStat(self, "Completed")
stat.title = "Completed Jobs"
- stat = CuminStat(self, "Idle")
+ stat = self.PercentStat(self, "Idle")
stat.title = "Idle Jobs"
- stat = CuminStat(self, "Held")
+ stat = self.PercentStat(self, "Held")
stat.title = "Held Jobs"
- stat = CuminStat(self, "Removed")
+ stat = self.PercentStat(self, "Removed")
stat.title = "Removed Jobs"
- stat = CuminStat(self, "Jobs")
+ stat = self.PercentStat(self, "Jobs")
stat.title = "Total Jobs"
action = self.VisJobs(self, "jobs")
action.navigable = False
+ action = self.VisSlots(self, "slots")
+ action.navigable = False
+
def init(self):
self.frame = self.model.frame.pool
@@ -1884,6 +1894,102 @@
return title
+ class PercentStat(CuminStat):
+ def value_text(self, pool):
+ state = self.name
+
+ if state == "Jobs":
+ return str(Job.select().count())
+ value = self.get_value(pool, state)
+ return str(value)
+
+ def get_value(self, pool, state):
+ elems = list()
+ istate = JobStatusInfo.get_status_int(state)
+ elems.append("job_status = %i" % istate)
+ elems.append("s.pool = '%s'" % pool.id)
+
+ # manually removed jobs will have a state of Idle
+ # with a deletion_time
+ if state == "Idle":
+ elems.append("job.deletion_time is null")
+ where = " and ".join(elems)
+
+ # manually removed jobs will have a state of Idle
+ # with a deletion_time
+ if state == "Removed":
+ removed = "(job.deletion_time is not null and job_status = %i)" % JobStatusInfo.get_status_int("Idle")
+ where = " or ".join((where, removed))
+
+ jn = "inner join scheduler as s on s.id = scheduler_id"
+ return Job.select(where, join=jn).count()
+
+ def rate_text(self, pool):
+ state = self.name
+ return self.get_item_rate(pool, state)
+
+ def get_item_rate(self, pool, state):
+ jobs = Job.select().count()
+
+ if state == "Jobs":
+ value = jobs
+ else:
+ value = self.get_value(pool, state)
+ if jobs:
+ percent = (value*1.0) / (jobs*1.0) * 100.0
+ return jobs and "%2.2f" % percent or "-"
+
+ class VisSlots(CuminAction):
+ # list of status/colors in the order we want them displayed
+ # in the legend
+ load_colors = [("Idle", "clear"),
+ ("Busy", "green"),
+ ("Suspended", "red"),
+ ("Vacating", "red"),
+ ("Killing", "blue"),
+ ("Benchmarking", "yellow")]
+
+ def __init__(self, cls, name):
+ super(CuminPool.VisSlots, self).__init__(cls, name)
+
+ self.slot_set = self.ModelPoolSlotSet(cls.model.app, name)
+
+ def get_xml_response(self, session, pool, *args):
+ slots = self.get_slots(session, pool)
+ writer = Writer()
+ writer.write("<slots>")
+ for slot in slots:
+ writer.write("<slot id='%i' name='%s' machine='%s' job='%s' color='%s'/>" % \
+ (slot["id"],
+ slot["name"],
+ slot["machine"],
+ slot["job_id"],
+ self.get_color(slot)))
+ writer.write("</slots>")
+ return writer.to_string()
+
+ def get_slots(self, session, pool):
+ cursor = self.slot_set.do_get_items(session, pool)
+ slot_list = self.slot_set.cursor_to_rows(cursor)
+ return slot_list
+
+ class ModelPoolSlotSet(PoolSlotSet):
+ def render_sql_limit(self, session, *args):
+ pass
+
+ def render_sql_orderby(self, session, *args):
+ return "order by name asc"
+
+ def get_color(self, slot):
+ activity = slot["activity"]
+ for status, color in self.load_colors:
+ if status == activity:
+ return color
+ return "black"
+
+ def get_colors(self):
+ return self.load_colors
+
class VisJobs(CuminAction):
def __init__(self, cls, name):
super(CuminPool.VisJobs, self).__init__(cls, name)
@@ -1891,7 +1997,6 @@
self.job_set = self.ModelSystemJobSet(cls.model.app, name)
def get_xml_response(self, session, pool, *args):
- pool = self.model.get_main_pool()
jobs = self.get_jobs(session, pool)
writer = Writer()
writer.write("<jobs>")
16 years, 2 months
rhmessaging commits: r2798 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-13 10:51:43 -0500 (Thu, 13 Nov 2008)
New Revision: 2798
Modified:
mgmt/trunk/cumin/python/cumin/model.py
Log:
A temporary workaround for a data problem
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-13 15:17:12 UTC (rev 2797)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-13 15:51:43 UTC (rev 2798)
@@ -2228,6 +2228,10 @@
class DateAdProperty(AdProperty):
def render_datetime(self, session, value):
+ # XXX
+ if type(value) is unicode:
+ return value
+
return fmt_datetime(value)
class JobStatusProperty(AdProperty):
16 years, 2 months