rhmessaging commits: r2827 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: nunofsantos
Date: 2008-11-17 15:41:29 -0500 (Mon, 17 Nov 2008)
New Revision: 2827
Added:
mgmt/trunk/mint/python/mint/priqueue.py
Modified:
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/update.py
Log:
use raw sql for insertion and updating of qmf updates to improve database performance; also includes Ted's priority queue code
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,6 +1,8 @@
import logging
import qpid.qmfconsole
+import pickle
import struct
+import types
from threading import Lock, RLock
from sqlobject import *
from traceback import print_exc
@@ -224,6 +226,15 @@
log.info("Connection failed: %s ", e.message)
print_exc()
+ def disconnect(self, model):
+ log.info("Disconnecting from broker '%s' at %s" % (self.name, self.url))
+ try:
+ model.getSession().delBroker(self.qmfBroker)
+ log.info("Disconnection succeeded")
+ except Exception, e:
+ log.info("Disconnection failed: %s ", e.message)
+ print_exc()
+
def getBrokerId(self):
if self.qmfBroker is not None:
return str(self.qmfBroker.getBrokerId())
@@ -237,6 +248,10 @@
except IndexError:
return None
+ def destroySelf(self):
+ self.disconnect(MintModel.staticInstance)
+ super(BrokerRegistration, self).destroySelf()
+
class BrokerGroup(SQLObject):
class sqlmeta:
lazyUpdate = True
@@ -283,6 +298,8 @@
self.connCloseListener = None
self.__lock = RLock()
+ self.dbStyle = MixedCaseUnderscoreStyle()
+ self.dbConn = None
# map containing updateObjects that have a missing parent dependency, for deferred insertion
# (missing_class, missing_id.first, missing_id.second) -> [updateObject, ..., updateObject]
@@ -311,7 +328,7 @@
raise e
def init(self):
- sqlhub.processConnection = connectionForURI(self.dataUri)
+ sqlhub.processConnection = self.dbConn = connectionForURI(self.dataUri)
def start(self):
self.updateThread.start()
@@ -322,29 +339,90 @@
def setCloseListener(self, connCloseListener):
self.connCloseListener = connCloseListener
- def getObject(self, cls, id, broker):
- obj = None
+ def getObjectId(self, cls, id):
if isinstance(id, qpid.qmfconsole.ObjectId):
- compositeId = "%s:%s" % (id.first, id.second)
try:
- obj = cls.selectBy(sourceScopeId=id.first, sourceObjectId=id.second)[0]
- except IndexError:
+ conn = self.dbConn.getConnection()
+ cursor = conn.cursor()
+ cursor.execute("select id from %s where source_scope_id = %s and source_object_id = %s" \
+ % (self.dbStyle.pythonClassToDBTable(cls.__name__), id.first, id.second));
+ for rec in cursor:
+ return rec[0]
+ except Exception:
raise ObjectNotFound()
- elif cls.__name__.endswith("Pool"):
+ finally:
+ conn.close()
+ else:
+ raise ObjectNotFound()
+
+ def __pythonValueToDB(self, key, value):
+ if key == "qmfClassKey":
+ value = u"'%s'" % unicode(value, "raw_unicode_escape").replace("'", "''")
+ elif type(value) == types.DictType:
+ value = u"'%s'" % pickle.dumps(value).replace("'", "''")
+ elif value is None:
+ value = "NULL"
+ else:
try:
- obj = cls.selectBy(sourceId=id)[0]
- except IndexError:
- raise ObjectNotFound()
-
- return obj
+ x = int(value)
+ except Exception:
+ value = u"'%s'" % str(value).replace("'", "''")
+ return value
+ def __generateSQLWhereClause(self, colValMap):
+ whereClause = ""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ whereClause += u"%s = %s and " % (self.dbStyle.pythonAttrToDBColumn(key), value)
+ whereClause = whereClause[:-5]
+ return whereClause
+
+ def generateSQLSelect(self, table, id, resultCols="id"):
+ return self.generateSQLSelectWhere(table, {"source_scope_id": id.first, "source_object_id": id.second}, resultCols)
+
+ def generateSQLSelectWhere(self, table, where, resultCols="id"):
+ whereClause = self.__generateSQLWhereClause(where)
+ sql = "select %s from %s where %s" % (resultCols, table, whereClause)
+ return sql
+
+ def generateSQLInsert(self, table, colValMap, subQuery={}):
+ columns = u""
+ values = u""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ columns += u"%s , " % (self.dbStyle.pythonAttrToDBColumn(key))
+ values += u"%s , " % (value)
+ for subCol, subVal in subQuery.iteritems():
+ columns += u"%s , " % (subCol)
+ values += u"(%s) , " % (subVal)
+ columns = columns[:-3]
+ values = values[:-3]
+ sql = u"insert into %s (%s) values (%s)" % (table, columns, values)
+ return sql
+
+ def generateSQLUpdate(self, table, colValMap, id, updateStats=False):
+ return self.generateSQLUpdateWhere(table, colValMap, \
+ {"source_scope_id": id.first, "source_object_id": id.second}, updateStats)
+
+ def generateSQLUpdateWhere(self, table, colValMap, where, updateStats=False):
+ updateValues = u""
+ for (key, value) in colValMap.iteritems():
+ value = self.__pythonValueToDB(key, value)
+ updateValues += u"%s = %s , " % (self.dbStyle.pythonAttrToDBColumn(key), value)
+ if updateStats:
+ updateValues += u"stats_prev_id = stats_curr_id , "
+ updateValues = updateValues[:-3]
+ whereClause = self.__generateSQLWhereClause(where)
+ sql = u"update %s set %s where %s" % (table, updateValues, whereClause)
+ return sql
+
def getSession(self):
return self.mgmtSession
def callMethod(self, managedBroker, objId, classKey, methodName, callback, args):
self.lock()
try:
- broker = self.managedBrokers[managedBroker]
+ broker, dbObjId = self.managedBrokers[managedBroker]
finally:
self.unlock()
@@ -363,7 +441,7 @@
""" Invoked when a connection is established to a broker """
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = broker
+ self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
finally:
self.unlock()
@@ -396,11 +474,19 @@
def objectProps(self, broker, record):
""" Invoked when an object is updated. """
- self.updateThread.enqueue(update.PropertyUpdate(broker, record))
+ if record.getClassKey()[1] == "job":
+ priority = 1
+ else:
+ priority = 0
+ self.updateThread.enqueue(update.PropertyUpdate(broker, record), priority)
def objectStats(self, broker, record):
""" Invoked when an object is updated. """
- self.updateThread.enqueue(update.StatisticUpdate(broker, record))
+ if record.getClassKey()[1] == "job":
+ priority = 1
+ else:
+ priority = 0
+ self.updateThread.enqueue(update.StatisticUpdate(broker, record), priority)
def event(self, broker, event):
""" Invoked when an event is raised. """
@@ -412,7 +498,7 @@
def brokerInfo(self, broker):
self.lock()
try:
- self.managedBrokers[str(broker.getBrokerId())] = broker
+ self.managedBrokers[str(broker.getBrokerId())] = (broker, 0)
finally:
self.unlock()
Added: mgmt/trunk/mint/python/mint/priqueue.py
===================================================================
--- mgmt/trunk/mint/python/mint/priqueue.py (rev 0)
+++ mgmt/trunk/mint/python/mint/priqueue.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -0,0 +1,39 @@
+from Queue import Queue
+from collections import deque
+
+class PriQueue(Queue):
+ """ """
+ def __init__(self, maxsize=0, slotCount=1):
+ self.slotCount=slotCount
+ Queue.__init__(self, maxsize)
+
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.slots = []
+ for i in range(self.slotCount):
+ self.slots.append(deque())
+
+ def _qsize(self):
+ size = 0
+ for i in range(self.slotCount):
+ size += len(self.slots[i])
+ return size
+
+ def _empty(self):
+ return self._qsize() == 0
+
+ def _full(self):
+ return self.maxsize > 0 and self._qsize() == self.maxsize
+
+ def _put(self, item):
+ slot = item[0]
+ if slot in range(self.slotCount):
+ self.slots[slot].append(item)
+ else:
+ raise ValueError("Invalid priority slot")
+
+ def _get(self):
+ for slot in range(self.slotCount):
+ if len(self.slots[slot]) > 0:
+ return self.slots[slot].popleft()
+ return None
Property changes on: mgmt/trunk/mint/python/mint/priqueue.py
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-11-17 18:51:55 UTC (rev 2826)
+++ mgmt/trunk/mint/python/mint/update.py 2008-11-17 20:41:29 UTC (rev 2827)
@@ -1,12 +1,12 @@
import logging
import datetime
import types
-import struct
-from Queue import Queue as ConcurrentQueue, Full, Empty
+from Queue import Full, Empty
from threading import Thread
from traceback import print_exc
from qpid.datatypes import UUID
from mint.schema import *
+from mint.priqueue import PriQueue as ConcurrentQueue
log = logging.getLogger("mint.update")
@@ -21,13 +21,13 @@
super(ModelUpdateThread, self).__init__()
self.model = model
- self.updates = ConcurrentQueue()
+ self.updates = ConcurrentQueue(slotCount=2)
self.stopRequested = False
self.setDaemon(False)
- def enqueue(self, update):
+ def enqueue(self, update, priority=0):
try:
- self.updates.put(update)
+ self.updates.put((priority, update))
except Full:
log.exception("Queue is full")
pass
@@ -35,7 +35,9 @@
def run(self):
while True:
try:
- update = self.updates.get(True, 1)
+ priority, update = self.updates.get(True, 1)
+ if self.stopRequested:
+ break
except Empty:
if self.stopRequested:
break
@@ -58,8 +60,8 @@
self.qmfObj = obj
def getStatsClass(self, cls):
- return getattr(mint, cls.__name__ + "Stats")
-
+ return cls + "Stats"
+
def process(self):
pass
@@ -87,8 +89,9 @@
if othercls:
attrname = name[0:-3]
+ attrname += "_id"
try:
- attrs[attrname] = model.getObject(othercls, id, self.broker)
+ attrs[attrname] = model.getObjectId(othercls, id)
except KeyError:
log.info("Referenced object %s '%s' not found by key '%s'" % (clsname, id, attrname))
except mint.ObjectNotFound:
@@ -101,18 +104,18 @@
orphan = True
else:
log.error("Class '%s' not found" % clsname)
- elif not hasattr(cls, orig_name):
+ elif not hasattr(eval("mint.schema."+cls), name):
# Remove attrs that we don't have in our schema
- log.debug("Class '%s' has no field '%s'" % (cls.__name__, name))
+ log.debug("Class '%s' has no field '%s'" % ("mint.schema."+cls, name))
del attrs[name]
#XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
elif name in ("DaemonStartTime", "EnteredCurrentActivity", "EnteredCurrentState", "JobStart",
"LastBenchmark", "LastFetchWorkCompleted", "LastFetchWorkSpawned", "LastPeriodicCheckpoint",
"MyCurrentTime", "QDate", "JobQueueBirthdate", "MonitorSelfTime") \
and (type(attrs[name]) is types.LongType or type(attrs[name]) is types.IntType or attrs[name] == 0):
- attrs[name] = datetime.fromtimestamp(attrs[name]/1000000000)
+ attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]/1000000000))
elif name.endswith("Time") and type(attrs[name]) is types.IntType and attrs[name] == 0:
- attrs[name] = datetime.fromtimestamp(attrs[name])
+ attrs[name] = time_unwarp(datetime.fromtimestamp(attrs[name]))
#XXX FIX -- TODO when converting to new API, will lookup attribute type in schema representation
elif isinstance(attrs[name], UUID):
# convert UUIDs into their string representation, to be handled by sqlobject
@@ -129,42 +132,50 @@
def process(self, model):
try:
- cls = self.qmfObj.getSchema().getKey()[1]
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = eval(cls[0].upper()+cls[1:])
attrs = dict(self.qmfObj.getProperties())
timestamps = self.qmfObj.getTimestamps()
id = self.qmfObj.getObjectId()
+ pkg, cls, hash = self.qmfObj.getClassKey()
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+
if self.processAttributes(attrs, cls, model) == None:
# object is orphan, a parent dependency was not found;
# insertion in db is deferred until parent info is received
return
- obj = None
+ attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
+ attrs["creationTime"] = time_unwarp(datetime.fromtimestamp(timestamps[1]/1000000000))
+ if timestamps[2] != 0:
+ attrs["deletionTime"] = time_unwarp(datetime.fromtimestamp(timestamps[2]/1000000000))
+ log.debug("%s(%s) marked deleted", cls, id)
+
+ attrs["sourceScopeId"] = id.first
+ attrs["sourceObjectId"] = id.second
+ attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, origCls, hash)
+ attrs["managedBroker"] = str(self.broker.getBrokerId())
+
+ conn = model.dbConn.getConnection()
try:
- obj = model.getObject(cls, id, self.broker)
- except mint.ObjectNotFound:
- obj = cls()
- log.debug("%s(%i) created", cls.__name__, obj.id)
- attrs["sourceScopeId"] = id.first
- attrs["sourceObjectId"] = id.second
- pkg, cls, hash = self.qmfObj.getClassKey()
- attrs["qmfClassKey"] = "%s, %s, %s" % (pkg, cls, hash)
- attrs["managedBroker"] = str(self.broker.getBrokerId())
+ cursor = conn.cursor()
+ sql = model.generateSQLUpdate(sqlCls, attrs, id)
+ cursor.execute(sql)
+ if cursor.rowcount == 0:
+ sql = model.generateSQLInsert(sqlCls, attrs)
+ cursor.execute(sql)
+ log.debug("%s(%s) created", cls, id)
+ conn.commit()
except Exception, e:
+ conn.rollback()
print e
+ print_exc()
+ finally:
+ conn.close()
- attrs["recTime"] = datetime.fromtimestamp(timestamps[0]/1000000000)
- attrs["creationTime"] = datetime.fromtimestamp(timestamps[1]/1000000000)
- if timestamps[2] != 0:
- attrs["deletionTime"] = datetime.fromtimestamp(timestamps[2]/1000000000)
- log.debug("%s(%i) marked deleted", cls, obj.id)
-
- obj.set(**attrs)
- obj.syncUpdate()
-
if (cls, id.first, id.second) in model.orphanObjectMap:
# this object is the parent of orphan objects in the map, re-enqueue for insertion
orphanObjects = model.orphanObjectMap.pop((cls, id.first, id.second))
@@ -172,64 +183,83 @@
model.updateThread.enqueue(orphanObj)
log.info("Inserted %d orphan objects whose creation had been deferred" % (len(orphanObjects)))
- if cls == "broker":
- if str(self.broker.getBrokerId()) in model.managedBrokers:
- model.managedBrokers[str(self.broker.getBrokerId())].broker = self.broker
- reg = mint.BrokerRegistration.selectBy(url=self.broker.getFullUrl())[0]
- reg.broker = obj
- reg.syncUpdate()
+ if cls == "Broker":
+ if str(self.broker.getBrokerId()) in model.managedBrokers:
+ broker, dbObjId = model.managedBrokers[str(self.broker.getBrokerId())]
+ if dbObjId == 0:
+ try:
+ conn = model.dbConn.getConnection()
+ cursor = conn.cursor()
+ sql = model.generateSQLSelectWhere(sqlCls, {"managed_broker": str(self.broker.getBrokerId())})
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbObjId = rec[0]
+ cursor = conn.cursor()
+ sql = model.generateSQLUpdateWhere("broker_registration", {"broker_id": dbObjId}, \
+ {"url": self.broker.getFullUrl()})
+ cursor.execute(sql)
+ conn.commit()
+ model.managedBrokers[str(self.broker.getBrokerId())] = (broker, dbObjId)
+ except Exception, e:
+ conn.rollback()
+ print e
+ print_exc()
+ finally:
+ conn.close()
except:
print_exc()
- attrs["managedBroker"] = str(self.broker.getBrokerId())
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (timestamps[0]/1000000000))
- attrs["creationTime"] = datetime.fromtimestamp \
- (timestamps[1]/1000000000)
-
-
class StatisticUpdate(ModelUpdate):
def __init__(self, broker, obj):
ModelUpdate.__init__(self, broker, obj)
def process(self, model):
try:
- cls = self.qmfObj.getSchema().getKey()[1]
- if cls in mint.schema.schemaReservedWordsMap:
- cls = mint.schema.schemaReservedWordsMap.get(cls)
- cls = eval(cls[0].upper()+cls[1:])
attrs = dict(self.qmfObj.getStatistics())
timestamps = self.qmfObj.getTimestamps()
id = self.qmfObj.getObjectId()
+ pkg, cls, hash = self.qmfObj.getClassKey()
- obj = None
- try:
- obj = model.getObject(cls, id, self.broker)
- except mint.ObjectNotFound:
- # handle this
- raise mint.ObjectNotFound
+ origCls = cls
+ if cls in mint.schema.schemaReservedWordsMap:
+ cls = mint.schema.schemaReservedWordsMap.get(cls)
+ cls = cls[0].upper()+cls[1:]
+ sqlCls = model.dbStyle.pythonClassToDBTable(cls)
+
+ statsCls = self.getStatsClass(cls)
+ sqlStatsCls = model.dbStyle.pythonClassToDBTable(statsCls)
-
- statscls = self.getStatsClass(cls)
- if self.processAttributes(attrs, statscls, model) == None:
+ if self.processAttributes(attrs, statsCls, model) == None:
# object is orphan, a parent dependency was not found;
# insertion in db is deferred until parent info is received
return
- attrs["recTime"] = time_unwarp(datetime.fromtimestamp \
- (timestamps[0]/1000000000))
+ attrs["recTime"] = time_unwarp(datetime.fromtimestamp(timestamps[0]/1000000000))
- # Set the stat->obj reference
- attrs[cls.__name__[0].lower() + cls.__name__[1:]] = obj
- statsobj = statscls()
- statsobj.set(**attrs)
- statsobj.syncUpdate()
+ conn = model.dbConn.getConnection()
+ try:
+ cursor = conn.cursor()
+ subSql = model.generateSQLSelect(sqlCls, id)
+ sql = model.generateSQLInsert(sqlStatsCls, attrs, {sqlCls + "_id": subSql})
+ cursor.execute(sql)
- obj.statsPrev = obj.statsCurr
- obj.statsCurr = statsobj
- obj.syncUpdate()
+ sql = "select currval('%s_id_seq')" % (sqlStatsCls)
+ cursor.execute(sql)
+ rec = cursor.fetchone()
+ dbStatsId = rec[0]
+ log.debug("%s(%s) created", statsCls, id)
+ sql = model.generateSQLUpdate(sqlCls, {"stats_curr_id": dbStatsId}, id, updateStats=True)
+ cursor.execute(sql)
+ conn.commit()
+ except Exception, e:
+ conn.rollback()
+ print e
+ print_exc()
+ finally:
+ conn.close()
+
except:
print_exc()
15 years, 7 months
rhmessaging commits: r2826 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 13:51:55 -0500 (Mon, 17 Nov 2008)
New Revision: 2826
Modified:
mgmt/trunk/mint/python/mint/schema.py
Log:
Changed test for existence of arguments to be more explicit. An argument of False was not being passed to API.
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-11-17 18:51:43 UTC (rev 2825)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-11-17 18:51:55 UTC (rev 2826)
@@ -175,7 +175,7 @@
def GetAd(self, model, callback, JobAd):
actualArgs = list()
- if JobAd:
+ if JobAd is not None:
actualArgs.append(JobAd)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetAd",
@@ -183,9 +183,9 @@
def SetAttribute(self, model, callback, Name, Value):
actualArgs = list()
- if Name:
+ if Name is not None:
actualArgs.append(Name)
- if Value:
+ if Value is not None:
actualArgs.append(Value)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetAttribute",
@@ -193,7 +193,7 @@
def Hold(self, model, callback, Reason):
actualArgs = list()
- if Reason:
+ if Reason is not None:
actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Hold",
@@ -201,7 +201,7 @@
def Release(self, model, callback, Reason):
actualArgs = list()
- if Reason:
+ if Reason is not None:
actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Release",
@@ -209,7 +209,7 @@
def Remove(self, model, callback, Reason):
actualArgs = list()
- if Reason:
+ if Reason is not None:
actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Remove",
@@ -217,13 +217,13 @@
def Fetch(self, model, callback, File, Start, End, Data):
actualArgs = list()
- if File:
+ if File is not None:
actualArgs.append(File)
- if Start:
+ if Start is not None:
actualArgs.append(Start)
- if End:
+ if End is not None:
actualArgs.append(End)
- if Data:
+ if Data is not None:
actualArgs.append(Data)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Fetch",
@@ -354,7 +354,7 @@
def GetLimits(self, model, callback, Limits):
actualArgs = list()
- if Limits:
+ if Limits is not None:
actualArgs.append(Limits)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetLimits",
@@ -362,9 +362,9 @@
def SetLimit(self, model, callback, Name, Max):
actualArgs = list()
- if Name:
+ if Name is not None:
actualArgs.append(Name)
- if Max:
+ if Max is not None:
actualArgs.append(Max)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetLimit",
@@ -456,7 +456,7 @@
def Start(self, model, callback, Subsystem):
actualArgs = list()
- if Subsystem:
+ if Subsystem is not None:
actualArgs.append(Subsystem)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Start",
@@ -464,7 +464,7 @@
def Stop(self, model, callback, Subsystem):
actualArgs = list()
- if Subsystem:
+ if Subsystem is not None:
actualArgs.append(Subsystem)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Stop",
@@ -651,7 +651,7 @@
def expand(self, model, callback, by):
"""Increase number of files allocated for this journal"""
actualArgs = list()
- if by:
+ if by is not None:
actualArgs.append(by)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "expand",
@@ -757,9 +757,9 @@
def echo(self, model, callback, sequence, body):
"""Request a response to test the path to the management broker"""
actualArgs = list()
- if sequence:
+ if sequence is not None:
actualArgs.append(sequence)
- if body:
+ if body is not None:
actualArgs.append(body)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "echo",
@@ -768,19 +768,19 @@
def connect(self, model, callback, host, port, durable, authMechanism, username, password, transport):
"""Establish a connection to another broker"""
actualArgs = list()
- if host:
+ if host is not None:
actualArgs.append(host)
- if port:
+ if port is not None:
actualArgs.append(port)
- if durable:
+ if durable is not None:
actualArgs.append(durable)
- if authMechanism:
+ if authMechanism is not None:
actualArgs.append(authMechanism)
- if username:
+ if username is not None:
actualArgs.append(username)
- if password:
+ if password is not None:
actualArgs.append(password)
- if transport:
+ if transport is not None:
actualArgs.append(transport)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "connect",
@@ -789,11 +789,11 @@
def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
actualArgs = list()
- if srcQueue:
+ if srcQueue is not None:
actualArgs.append(srcQueue)
- if destQueue:
+ if destQueue is not None:
actualArgs.append(destQueue)
- if qty:
+ if qty is not None:
actualArgs.append(qty)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "queueMoveMessages",
@@ -898,7 +898,7 @@
def purge(self, model, callback, request):
"""Discard all or some messages on a queue"""
actualArgs = list()
- if request:
+ if request is not None:
actualArgs.append(request)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "purge",
@@ -1093,23 +1093,23 @@
def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic):
"""Bridge messages over the link"""
actualArgs = list()
- if durable:
+ if durable is not None:
actualArgs.append(durable)
- if src:
+ if src is not None:
actualArgs.append(src)
- if dest:
+ if dest is not None:
actualArgs.append(dest)
- if key:
+ if key is not None:
actualArgs.append(key)
- if tag:
+ if tag is not None:
actualArgs.append(tag)
- if excludes:
+ if excludes is not None:
actualArgs.append(excludes)
- if srcIsQueue:
+ if srcIsQueue is not None:
actualArgs.append(srcIsQueue)
- if srcIsLocal:
+ if srcIsLocal is not None:
actualArgs.append(srcIsLocal)
- if dynamic:
+ if dynamic is not None:
actualArgs.append(dynamic)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "bridge",
15 years, 7 months
rhmessaging commits: r2825 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 13:51:43 -0500 (Mon, 17 Nov 2008)
New Revision: 2825
Modified:
mgmt/trunk/mint/python/mint/schemaparser.py
Log:
Changed test for existence of arguments to be more explicit. An argument of False was not being passed to API.
Modified: mgmt/trunk/mint/python/mint/schemaparser.py
===================================================================
--- mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-17 18:39:47 UTC (rev 2824)
+++ mgmt/trunk/mint/python/mint/schemaparser.py 2008-11-17 18:51:43 UTC (rev 2825)
@@ -158,7 +158,7 @@
actualArgs = " actualArgs = list()\n"
for arg in elem.query["arg"]:
formalArgs += "%s, " % (arg["@name"])
- actualArgs += " if %s:\n" % (arg["@name"])
+ actualArgs += " if %s is not None:\n" % (arg["@name"])
actualArgs += " actualArgs.append(%s)\n" % (arg["@name"])
if (formalArgs != ", "):
15 years, 7 months
rhmessaging commits: r2824 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 13:39:47 -0500 (Mon, 17 Nov 2008)
New Revision: 2824
Modified:
mgmt/trunk/cumin/python/cumin/brokerlink.py
Log:
Fix showing the broker link add dialog.
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-11-17 17:06:56 UTC (rev 2823)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2008-11-17 18:39:47 UTC (rev 2824)
@@ -45,10 +45,9 @@
def render_add_broker_link_url(self, session, vhost):
branch = session.branch()
- self.frame.set_object(branch, vhost.broker.registration)
self.frame.link_add.show(branch)
return branch.marshal()
-
+
def get_args(self, session):
reg = self.frame.get_object(session)
return (reg.getDefaultVhost(),)
15 years, 7 months
rhmessaging commits: r2823 - in mgmt/trunk: mint/bin and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-17 12:06:56 -0500 (Mon, 17 Nov 2008)
New Revision: 2823
Added:
mgmt/trunk/mint/bin/mint-bench
mgmt/trunk/mint/python/mint/tools.py
Modified:
mgmt/trunk/cumin/python/cumin/tools.py
Log:
Add a mint-bench tool; consolidate parsing logic on base commands
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-11-17 17:02:15 UTC (rev 2822)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-11-17 17:06:56 UTC (rev 2823)
@@ -58,8 +58,22 @@
self.config.init()
def run(self):
- pass
+ try:
+ opts, args = self.parse(sys.argv)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+ if "help" in opts:
+ self.print_help()
+ sys.exit(0)
+
+ self.do_run(opts, args)
+
+ def do_run(self, opts, args):
+ raise Exception("Not implemented")
+
def main(self):
self.check()
self.init()
@@ -406,14 +420,7 @@
opt = CommandOption(self, "ssl")
opt.description = "Serve web pages using SSL"
- def run(self):
- try:
- opts, args = self.parse(sys.argv)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
+ def do_run(self, opts, args):
self.config.load_dict(opts)
self.config.ssl = "ssl" in opts
@@ -469,18 +476,7 @@
opt.argument = "ADDR"
opt.description = "Use existing broker at ADDR"
- def run(self):
- try:
- opts, args = self.parse(sys.argv)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
- if "help" in opts:
- self.print_help()
- sys.exit(0)
-
+ def do_run(self, opts, args):
self.config.load_dict(opts)
if self.config.debug:
@@ -537,18 +533,7 @@
opt = CommandOption(self, "check-xml")
opt.description = "Check that page output is well-formed xml"
- def run(self):
- try:
- opts, args = self.parse(sys.argv)
- except CommandException, e:
- print "Error: %s" % e.message
- e.command.print_help()
- sys.exit(1)
-
- if "help" in opts:
- self.print_help()
- sys.exit(0)
-
+ def do_run(self, opts, args):
self.config.load_dict(opts)
if self.config.debug:
Added: mgmt/trunk/mint/bin/mint-bench
===================================================================
--- mgmt/trunk/mint/bin/mint-bench (rev 0)
+++ mgmt/trunk/mint/bin/mint-bench 2008-11-17 17:06:56 UTC (rev 2823)
@@ -0,0 +1,59 @@
+#!/usr/bin/python
+
+import sys, os, logging
+
+from mint.tools import MintBenchTool
+
+def do_main():
+ MintBenchTool("mint-bench").main()
+
+def main():
+ root = logging.getLogger("mint")
+ root.setLevel(logging.DEBUG)
+
+ h = logging.StreamHandler()
+ h.setLevel(logging.DEBUG)
+ root.addHandler(h)
+
+ if "--profile" in sys.argv:
+ from profile import Profile
+ from pstats import Stats
+
+ prof = Profile()
+
+ print "Calibrating"
+
+ biases = list()
+
+ for i in range(5):
+ bias = prof.calibrate(100000)
+ biases.append(bias)
+ print i, bias
+
+ prof.bias = sum(biases) / float(5)
+
+ print "Using bias %f" % prof.bias
+
+ try:
+ prof.run("do_main()")
+ except KeyboardInterrupt:
+ pass
+
+ file = "/tmp/cumin-test-stats"
+
+ prof.dump_stats(file)
+
+ stats = Stats(file)
+
+ stats.sort_stats("cumulative").print_stats(15)
+ stats.sort_stats("time").print_stats(15)
+
+ stats.strip_dirs()
+ else:
+ do_main()
+
+if __name__ == "__main__":
+ try:
+ main()
+ except KeyboardInterrupt:
+ pass
Property changes on: mgmt/trunk/mint/bin/mint-bench
___________________________________________________________________
Name: svn:executable
+ *
Added: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py (rev 0)
+++ mgmt/trunk/mint/python/mint/tools.py 2008-11-17 17:06:56 UTC (rev 2823)
@@ -0,0 +1,86 @@
+import sys, os
+
+from time import sleep
+from parsley.config import *
+from parsley.command import *
+
+from mint import *
+
+class BaseMintTool(Command):
+ def __init__(self, name):
+ super(BaseMintTool, self).__init__(None, name)
+
+ opt = CommandOption(self, "data")
+ opt.argument = "URI"
+ opt.description = "Connect to database at URI"
+
+ opt = CommandOption(self, "log-file")
+ opt.argument = "PATH"
+ opt.description = "Log to file at PATH"
+
+ opt = CommandOption(self, "log-level")
+ opt.argument = "LEVEL"
+ opt.description = "Log messages at or above LEVEL " + \
+ "('debug', 'info', 'warning', 'error')"
+
+ opt = CommandOption(self, "debug")
+ opt.description = "Enable debugging; print logging to console"
+
+ def check(self):
+ if os.getuid() not in (os.stat(__file__).st_uid, 0):
+ print "Error: You have insufficient privileges"
+ sys.exit(1)
+
+ def init(self):
+ super(BaseMintTool, self).init()
+
+ #self.config.init()
+
+ def run(self):
+ pass
+
+ def run(self):
+ try:
+ opts, args = self.parse(sys.argv)
+ except CommandException, e:
+ print "Error: %s" % e.message
+ e.command.print_help()
+ sys.exit(1)
+
+ if "help" in opts:
+ self.print_help()
+ sys.exit(0)
+
+ self.do_run(opts, args)
+
+ def do_run(self, opts, args):
+ raise Exception("Not implemented")
+
+ def main(self):
+ self.check()
+ self.init()
+ self.run()
+
+class MintBenchTool(BaseMintTool):
+ def __init__(self, name):
+ super(MintBenchTool, self).__init__(name)
+
+ def init(self):
+ super(MintBenchTool, self).init()
+
+ def do_run(self, opts, args):
+ ddef = "postgresql://cumin@localhost/cumin"
+ model = MintModel(opts.get("data", ddef), debug=True)
+
+ model.check()
+ model.init()
+ model.start()
+
+ try:
+ for arg in args[1:]:
+ model.getSession().addBroker(arg)
+
+ while True:
+ sleep(1)
+ finally:
+ model.stop()
15 years, 7 months
rhmessaging commits: r2822 - mgmt/trunk/cumin/resources.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 12:02:15 -0500 (Mon, 17 Nov 2008)
New Revision: 2822
Modified:
mgmt/trunk/cumin/resources/ie.css
Log:
Removed IE only sticky_note style since IE doesn't use it.
Modified: mgmt/trunk/cumin/resources/ie.css
===================================================================
--- mgmt/trunk/cumin/resources/ie.css 2008-11-17 16:01:39 UTC (rev 2821)
+++ mgmt/trunk/cumin/resources/ie.css 2008-11-17 17:02:15 UTC (rev 2822)
@@ -19,7 +19,3 @@
ul.radiotabs li a.selected {
background-image: url(resource?name=radio-button-checked.png);
}
-
-div.grid_cell div.sticky_note {
- visibility: hidden; /* doesn't work over <button>s anyway */
-}
\ No newline at end of file
15 years, 7 months
rhmessaging commits: r2821 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 11:01:39 -0500 (Mon, 17 Nov 2008)
New Revision: 2821
Modified:
mgmt/trunk/cumin/python/cumin/limits.py
mgmt/trunk/cumin/python/cumin/widgets.py
Log:
Fix negotiator update when there is no negotiator.
Modified: mgmt/trunk/cumin/python/cumin/limits.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/limits.py 2008-11-17 15:25:14 UTC (rev 2820)
+++ mgmt/trunk/cumin/python/cumin/limits.py 2008-11-17 16:01:39 UTC (rev 2821)
@@ -29,8 +29,9 @@
def get_negotiator(self, session):
#TODO: find better way to get the negotiator. from pool perhaps?
+ pool = self.frame.get_args(session)[0]
most_recent = None
- negotiators = Negotiator.select()
+ negotiators = Negotiator.select("pool='%s'" % pool.id)
for negotiator in negotiators:
if negotiator.managedBroker:
if not most_recent:
@@ -90,12 +91,14 @@
class LimitCount(AjaxField):
def get_url(self, session):
negotiator = self.parent.get_negotiator(session)
- if negotiator:
- return "call.xml?class=negotiator;id=%i;method=GetLimitCount" % negotiator.id
+ return negotiator and \
+ "call.xml?class=negotiator;id=%i;method=GetLimitCount" % negotiator.id
def get_title(self, session, title):
- return "%s <span id=\"%s\"><span class='count'>(?)</span>%s</span>" % \
- (title, self.name, self.render_script(session))
+ script = self.render_script(session)
+ count = script and "?" or "0"
+ return "%s <span id=\"%s\"><span class='count'>(%s)</span>%s</span>" % \
+ (title, self.name, count, script)
class NameColumn(ItemTableColumn):
def render_title(self, session, data):
Modified: mgmt/trunk/cumin/python/cumin/widgets.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/widgets.py 2008-11-17 15:25:14 UTC (rev 2820)
+++ mgmt/trunk/cumin/python/cumin/widgets.py 2008-11-17 16:01:39 UTC (rev 2821)
@@ -1118,7 +1118,7 @@
got_fn = self.got_fn(session)
elem_id = self.elem_id(session)
- return script % (get_fn, url, got_fn, elem_id, get_fn)
+ return url and script % (get_fn, url, got_fn, elem_id, get_fn) or ""
def get_url(self, session):
pass
15 years, 7 months
rhmessaging commits: r2820 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2008-11-17 10:25:14 -0500 (Mon, 17 Nov 2008)
New Revision: 2820
Modified:
mgmt/trunk/cumin/python/cumin/tools.py
Log:
This is now handled in the base command class
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2008-11-17 15:21:48 UTC (rev 2819)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2008-11-17 15:25:14 UTC (rev 2820)
@@ -414,10 +414,6 @@
e.command.print_help()
sys.exit(1)
- if "help" in opts:
- self.print_help()
- sys.exit(0)
-
self.config.load_dict(opts)
self.config.ssl = "ssl" in opts
15 years, 7 months
rhmessaging commits: r2819 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 10:21:48 -0500 (Mon, 17 Nov 2008)
New Revision: 2819
Modified:
mgmt/trunk/cumin/python/cumin/pool.py
Log:
Fixing minor typo
Modified: mgmt/trunk/cumin/python/cumin/pool.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/pool.py 2008-11-17 14:27:06 UTC (rev 2818)
+++ mgmt/trunk/cumin/python/cumin/pool.py 2008-11-17 15:21:48 UTC (rev 2819)
@@ -59,7 +59,7 @@
return data["jobs"]
class StatusColumn(SqlTableColumn):
- def render_title(sefl, session, data):
+ def render_title(self, session, data):
return "Status"
def render_content(self, session, data):
15 years, 7 months
rhmessaging commits: r2818 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2008-11-17 09:27:06 -0500 (Mon, 17 Nov 2008)
New Revision: 2818
Modified:
mgmt/trunk/mint/python/mint/schema.py
Log:
Updated schema.py that allows calls with 0 input arguments.
Modified: mgmt/trunk/mint/python/mint/schema.py
===================================================================
--- mgmt/trunk/mint/python/mint/schema.py 2008-11-17 14:13:02 UTC (rev 2817)
+++ mgmt/trunk/mint/python/mint/schema.py 2008-11-17 14:27:06 UTC (rev 2818)
@@ -175,46 +175,56 @@
def GetAd(self, model, callback, JobAd):
actualArgs = list()
- actualArgs.append(JobAd)
+ if JobAd:
+ actualArgs.append(JobAd)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetAd",
callback, args=actualArgs)
def SetAttribute(self, model, callback, Name, Value):
actualArgs = list()
- actualArgs.append(Name)
- actualArgs.append(Value)
+ if Name:
+ actualArgs.append(Name)
+ if Value:
+ actualArgs.append(Value)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetAttribute",
callback, args=actualArgs)
def Hold(self, model, callback, Reason):
actualArgs = list()
- actualArgs.append(Reason)
+ if Reason:
+ actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Hold",
callback, args=actualArgs)
def Release(self, model, callback, Reason):
actualArgs = list()
- actualArgs.append(Reason)
+ if Reason:
+ actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Release",
callback, args=actualArgs)
def Remove(self, model, callback, Reason):
actualArgs = list()
- actualArgs.append(Reason)
+ if Reason:
+ actualArgs.append(Reason)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Remove",
callback, args=actualArgs)
def Fetch(self, model, callback, File, Start, End, Data):
actualArgs = list()
- actualArgs.append(File)
- actualArgs.append(Start)
- actualArgs.append(End)
- actualArgs.append(Data)
+ if File:
+ actualArgs.append(File)
+ if Start:
+ actualArgs.append(Start)
+ if End:
+ actualArgs.append(End)
+ if Data:
+ actualArgs.append(Data)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Fetch",
callback, args=actualArgs)
@@ -344,15 +354,18 @@
def GetLimits(self, model, callback, Limits):
actualArgs = list()
- actualArgs.append(Limits)
+ if Limits:
+ actualArgs.append(Limits)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "GetLimits",
callback, args=actualArgs)
def SetLimit(self, model, callback, Name, Max):
actualArgs = list()
- actualArgs.append(Name)
- actualArgs.append(Max)
+ if Name:
+ actualArgs.append(Name)
+ if Max:
+ actualArgs.append(Max)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "SetLimit",
callback, args=actualArgs)
@@ -443,14 +456,16 @@
def Start(self, model, callback, Subsystem):
actualArgs = list()
- actualArgs.append(Subsystem)
+ if Subsystem:
+ actualArgs.append(Subsystem)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Start",
callback, args=actualArgs)
def Stop(self, model, callback, Subsystem):
actualArgs = list()
- actualArgs.append(Subsystem)
+ if Subsystem:
+ actualArgs.append(Subsystem)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "Stop",
callback, args=actualArgs)
@@ -636,7 +651,8 @@
def expand(self, model, callback, by):
"""Increase number of files allocated for this journal"""
actualArgs = list()
- actualArgs.append(by)
+ if by:
+ actualArgs.append(by)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "expand",
callback, args=actualArgs)
@@ -741,8 +757,10 @@
def echo(self, model, callback, sequence, body):
"""Request a response to test the path to the management broker"""
actualArgs = list()
- actualArgs.append(sequence)
- actualArgs.append(body)
+ if sequence:
+ actualArgs.append(sequence)
+ if body:
+ actualArgs.append(body)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "echo",
callback, args=actualArgs)
@@ -750,13 +768,20 @@
def connect(self, model, callback, host, port, durable, authMechanism, username, password, transport):
"""Establish a connection to another broker"""
actualArgs = list()
- actualArgs.append(host)
- actualArgs.append(port)
- actualArgs.append(durable)
- actualArgs.append(authMechanism)
- actualArgs.append(username)
- actualArgs.append(password)
- actualArgs.append(transport)
+ if host:
+ actualArgs.append(host)
+ if port:
+ actualArgs.append(port)
+ if durable:
+ actualArgs.append(durable)
+ if authMechanism:
+ actualArgs.append(authMechanism)
+ if username:
+ actualArgs.append(username)
+ if password:
+ actualArgs.append(password)
+ if transport:
+ actualArgs.append(transport)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "connect",
callback, args=actualArgs)
@@ -764,9 +789,12 @@
def queueMoveMessages(self, model, callback, srcQueue, destQueue, qty):
"""Move messages from one queue to another"""
actualArgs = list()
- actualArgs.append(srcQueue)
- actualArgs.append(destQueue)
- actualArgs.append(qty)
+ if srcQueue:
+ actualArgs.append(srcQueue)
+ if destQueue:
+ actualArgs.append(destQueue)
+ if qty:
+ actualArgs.append(qty)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "queueMoveMessages",
callback, args=actualArgs)
@@ -870,7 +898,8 @@
def purge(self, model, callback, request):
"""Discard all or some messages on a queue"""
actualArgs = list()
- actualArgs.append(request)
+ if request:
+ actualArgs.append(request)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "purge",
callback, args=actualArgs)
@@ -1064,15 +1093,24 @@
def bridge(self, model, callback, durable, src, dest, key, tag, excludes, srcIsQueue, srcIsLocal, dynamic):
"""Bridge messages over the link"""
actualArgs = list()
- actualArgs.append(durable)
- actualArgs.append(src)
- actualArgs.append(dest)
- actualArgs.append(key)
- actualArgs.append(tag)
- actualArgs.append(excludes)
- actualArgs.append(srcIsQueue)
- actualArgs.append(srcIsLocal)
- actualArgs.append(dynamic)
+ if durable:
+ actualArgs.append(durable)
+ if src:
+ actualArgs.append(src)
+ if dest:
+ actualArgs.append(dest)
+ if key:
+ actualArgs.append(key)
+ if tag:
+ actualArgs.append(tag)
+ if excludes:
+ actualArgs.append(excludes)
+ if srcIsQueue:
+ actualArgs.append(srcIsQueue)
+ if srcIsLocal:
+ actualArgs.append(srcIsLocal)
+ if dynamic:
+ actualArgs.append(dynamic)
originalId = ObjectId(None, self.sourceScopeId, self.sourceObjectId)
model.callMethod(self.managedBroker, originalId, self.qmfClassKey, "bridge",
callback, args=actualArgs)
15 years, 7 months