Author: eallen
Date: 2008-11-04 15:04:09 -0500 (Tue, 04 Nov 2008)
New Revision: 2737
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/cumin/python/cumin/queue.py
Log:
Added new "Add Queue" options
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2008-11-04 20:00:14 UTC (rev 2736)
+++ mgmt/trunk/cumin/python/cumin/model.py 2008-11-04 20:04:09 UTC (rev 2737)
@@ -798,12 +798,38 @@
return frame.queue_add.show(session)
def do_invoke(self, queue, args, completion):
+ FILECOUNT = "qpid.file_count"
+ FILESIZE = "qpid.file_size"
+ MAX_QUEUE_SIZE = "qpid.max_size"
+ MAX_QUEUE_COUNT = "qpid.max_count"
+ POLICY_TYPE = "qpid.policy_type"
+ CLUSTER_DURABLE = "qpid.persist_last_node"
+ LVQ = "qpid.last_value_queue"
+ OPTIMISTIC_CONSUME = "qpid.optimistic_consume"
+
reg = args["reg"]
+ declArgs = {}
+ if queue.durable:
+ declArgs[FILECOUNT] = args["file_count"]
+ declArgs[FILESIZE] = args["file_size"]
+
+ if args["q_size"]:
+ declArgs[MAX_QUEUE_SIZE] = args["q_size"]
+ if args["q_count"]:
+ declArgs[MAX_QUEUE_COUNT] = args["q_count"]
+ if args["policy"]:
+ declArgs[POLICY_TYPE] = args["policy"]
+ if args["cluster_durable"]:
+ declArgs[CLUSTER_DURABLE] = 1
+ if args["lvq"]:
+ declArgs[LVQ] = 1
+ if args["optimistic"]:
+ declArgs[OPTIMISTIC_CONSUME] = 1
+
session = self.getSessionFromRegistration(reg)
session.queue_declare(queue=queue.name,
- exclusive=queue.exclusive,
durable=queue.durable,
- auto_delete=queue.autoDelete)
+ arguments=declArgs)
# optionally bind to exchanges
binding_info = args['exchange_keys']
Modified: mgmt/trunk/cumin/python/cumin/queue.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/queue.py 2008-11-04 20:00:14 UTC (rev 2736)
+++ mgmt/trunk/cumin/python/cumin/queue.py 2008-11-04 20:04:09 UTC (rev 2737)
@@ -324,15 +324,38 @@
self.namef = NameField(app, "name")
self.add_field(self.namef)
- self.durable = DurabilityField(app, "durable")
+ self.durable = self.QueueDurabilityField(app, "durable")
self.add_field(self.durable)
- #self.exclusive = ExclusivityField(app, "exclusive")
- #self.add_field(self.exclusive)
+ self.more = MoreFieldSet(app, "more")
+ self.add_field(self.more)
- self.autodelete = AutoDeleteField(app, "autodelete")
- self.add_field(self.autodelete)
+ self.cluster_durable = self.ClusterDurabilityField(app,
"cluster_durable")
+ self.more.add_field(self.cluster_durable)
+
+ self.lvq = self.LVQField(app, "lvq")
+ self.more.add_field(self.lvq)
+
+ self.optimistic = self.OptimisticField(app, "optimistic")
+ self.more.add_field(self.optimistic)
+ self.policy = self.PolicyField(app, "policy")
+ self.more.add_field(self.policy)
+
+ self.file_count = self.FileCountField(app, "file_count")
+ self.file_count.input.param.default = 8
+ self.more.add_field(self.file_count)
+
+ self.file_size = self.FileSizeField(app, "file_size")
+ self.file_size.input.param.default = 24
+ self.more.add_field(self.file_size)
+
+ self.q_size = self.QSizeField(app, "q_size")
+ self.more.add_field(self.q_size)
+
+ self.q_count = self.QCountField(app, "q_count")
+ self.more.add_field(self.q_count)
+
self.bindings = ExchangeKeysField(app, "bindings")
self.add_field(self.bindings)
@@ -341,12 +364,144 @@
(errors, form_binding_info) = self.bindings.get_binding_errors(session,
queue_name)
return (errors or super_error, form_binding_info)
+ class QCountField(IntegerField):
+ def render_title(self, session):
+ return "Max Queue Count"
+
+ def render_field_help(self, session):
+ return "(Maximum in-memory queue size as a number of messages)"
+
+ class QSizeField(IntegerField):
+ def render_title(self, session):
+ return "Max Queue Size"
+
+ def render_field_help(self, session):
+ return "(Maximum in-memory queue size as bytes)"
+
+ class FileCountField(IntegerField):
+ def render_title(self, session):
+ return "File Count"
+
+ def render_field_help(self, session):
+ return "(Number of files in queue's persistence journal)"
+
+ class FileSizeField(IntegerField):
+ def render_title(self, session):
+ return "File Size"
+
+ def render_field_help(self, session):
+ return "(File size in pages - 64Kb/page)"
+
+ class QueueDurabilityField(TwoOptionRadioField):
+ def __init__(self, app, name, option1="durable",
option2="transient"):
+ super(QueueForm.QueueDurabilityField, self).__init__(app, name, option1,
option2)
+
+ self.option1_title = "Durable"
+ self.option2_title = "Transient"
+
+ def render_title(self, session):
+ return "Durable?"
+
+ def render_field_help(self, session):
+ return "(Queue is durable)"
+
+ class ClusterDurabilityField(TwoOptionRadioField):
+ def __init__(self, app, name, option1="durable",
option2="transient"):
+ super(QueueForm.ClusterDurabilityField, self).__init__(app, name, option1,
option2)
+
+ self.option1_title = "Cluster Durable"
+ self.option2_title = "Not Cluster Durable"
+
+ def render_title(self, session):
+ return "Cluster Durable?"
+
+ def render_field_help(self, session):
+ return "(Queue becomes durable if there is only one functioning cluster
node)"
+
+ class LVQField(TwoOptionRadioField):
+ def __init__(self, app, name, option1="enable",
option2="disable"):
+ super(QueueForm.LVQField, self).__init__(app, name, option1, option2)
+
+ self.option1_title = "Enabled"
+ self.option2_title = "Not Enabled"
+
+ def render_title(self, session):
+ return "Enable Last Value Queue?"
+
+ def render_field_help(self, session):
+ return "(Enable LVQ behavior on the queue)"
+
+ class OptimisticField(TwoOptionRadioField):
+ def __init__(self, app, name, option1="enable",
option2="disable"):
+ super(QueueForm.OptimisticField, self).__init__(app, name, option1, option2)
+
+ self.option1_title = "Enabled"
+ self.option2_title = "Not Enabled"
+
+ def render_title(self, session):
+ return "Enable Optimistic Consume?"
+
+ def render_field_help(self, session):
+ return "(Enable optimistic consume on the queue)"
+
+ class PolicyField(RadioField):
+ def __init__(self, app, name):
+ super(QueueForm.PolicyField, self).__init__(app, name, None)
+
+ self.param = Parameter(app, "param")
+ self.param.default = "reject"
+ self.add_parameter(self.param)
+
+ option = self.Reject(app, "reject", self.param)
+ self.add_option(option)
+
+ option = self.Flow(app, "flow", self.param)
+ self.add_option(option)
+
+ option = self.Ring(app, "ring", self.param)
+ self.add_option(option)
+
+ option = self.RingStrict(app, "ring_strict", self.param)
+ self.add_option(option)
+
+ def render_title(self, session):
+ return "Policy-type"
+
+ def render_field_help(self, session):
+ return "(Action taken when queue limit is reached)"
+
+ class Reject(RadioFieldOption):
+ def render_value(self, session):
+ return "reject"
+
+ def render_title(self, session):
+ return "Reject"
+
+ class Flow(RadioFieldOption):
+ def render_value(self, session):
+ return "flow"
+
+ def render_title(self, session):
+ return "Flow to disc"
+
+ class Ring(RadioFieldOption):
+ def render_value(self, session):
+ return "ring"
+
+ def render_title(self, session):
+ return "Ring"
+
+ class RingStrict(RadioFieldOption):
+ def render_value(self, session):
+ return "ring_strict"
+
+ def render_title(self, session):
+ return "Ring Strict"
+
class QueueAdd(QueueForm):
def process_submit(self, session):
queue_name = self.namef.get(session)
durable = self.durable.get(session)
- #exclusive = self.exclusive.get(session)
- autodelete = self.autodelete.get(session)
(errors, form_binding_info) = self.validate(session, queue_name)
if errors:
@@ -358,13 +513,19 @@
queue = Queue()
queue.name = queue_name
queue.durable = (durable == "durable")
- queue.exclusive = False
- queue.autoDelete = (autodelete == "autodel")
reg = self.frame.get_object(session)
- args = {
- "exchange_keys": form_binding_info,
- "reg": reg}
+ args = {}
+ args["reg"] = reg
+ args["exchange_keys"] = form_binding_info
+ args["cluster_durable"] = self.cluster_durable.get(session) ==
"durable"
+ args["lvq"] = self.lvq.get(session) == "enable"
+ args["optimistic"] = self.optimistic.get(session) ==
"enable"
+ args["policy"] = self.policy.get(session)
+ args["file_count"] = self.file_count.get(session)
+ args["file_size"] = self.file_size.get(session)
+ args["q_count"] = self.q_count.get(session)
+ args["q_size"] = self.q_size.get(session)
action = self.app.model.broker.add_queue
action.invoke(queue, args)