[rhmessaging-commits] rhmessaging commits: r3782 - mgmt/trunk/mint/python/mint.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jan 11 14:13:53 EST 2010
Author: justi9
Date: 2010-01-11 14:13:53 -0500 (Mon, 11 Jan 2010)
New Revision: 3782
Modified:
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
Log:
Purely cosmetic change: four-space indents and more pythonic naming
Modified: mgmt/trunk/mint/python/mint/expire.py
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/expire.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -79,4 +79,4 @@
log.debug("%i total records expired", total)
- stats.expireUpdateCount += 1
+ stats.expired += 1
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -473,14 +473,14 @@
stats = app.updateThread.stats
- enq = stats.enqueueCount
- deq = stats.dequeueCount
- drp = stats.dropCount
- dfr = stats.deferCount
+ enq = stats.enqueued
+ deq = stats.dequeued
+ drp = stats.dropped
+ dfr = stats.deferred
- prop = stats.propUpdateCount
- stat = stats.statUpdateCount
- exp = stats.expireUpdateCount
+ prop = stats.prop_updated
+ stat = stats.stat_updated
+ exp = stats.expired
print row % (enq - enq_last,
deq - deq_last,
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -11,58 +11,58 @@
log = logging.getLogger("mint.update")
class UpdateThread(MintDaemonThread):
- """
- Only the update thread writes to the database
- """
+ """
+ Only the update thread writes to the database
+ """
- def __init__(self, app):
- super(UpdateThread, self).__init__(app)
+ def __init__(self, app):
+ super(UpdateThread, self).__init__(app)
- self.conn = None
- self.stats = None
+ self.conn = None
+ self.stats = None
- self.updates = UpdateQueue(slotCount=2)
+ self.updates = UpdateQueue(slotCount=2)
- def init(self):
- self.conn = self.app.database.getConnection()
- self.stats = UpdateStats()
+ def init(self):
+ self.conn = self.app.database.getConnection()
+ self.stats = UpdateStats()
- def enqueue(self, update):
- update.thread = self
+ def enqueue(self, update):
+ update.thread = self
- self.updates.put(update)
+ self.updates.put(update)
- self.stats.enqueueCount += 1
+ self.stats.enqueued += 1
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
- if self.updates.qsize() > 1000:
- sleep(0.1)
+ if self.updates.qsize() > 1000:
+ sleep(0.1)
- def run(self):
- while True:
- if self.stopRequested:
- break
+ def run(self):
+ while True:
+ if self.stopRequested:
+ break
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
+ try:
+ update = self.updates.get(True, 1)
+ except Empty:
+ continue
- self.stats.dequeueCount += 1
+ self.stats.dequeued += 1
- update.process()
+ update.process()
class UpdateStats(object):
- def __init__(self):
- self.enqueueCount = 0
- self.dequeueCount = 0
- self.statUpdateCount = 0
- self.propUpdateCount = 0
- self.expireUpdateCount = 0
- self.dropCount = 0
- self.deferCount = 0
+ def __init__(self):
+ self.enqueued = 0
+ self.dequeued = 0
+ self.stat_updated = 0
+ self.prop_updated = 0
+ self.expired = 0
+ self.dropped = 0
+ self.deferred = 0
class ReferenceException(Exception):
def __init__(self, sought):
@@ -72,326 +72,330 @@
return repr(self.sought)
class Update(object):
- def __init__(self):
- self.thread = None
- self.priority = 0
+ def __init__(self):
+ self.thread = None
+ self.priority = 0
- def process(self):
- log.debug("Processing %s", self)
+ def process(self):
+ log.debug("Processing %s", self)
- assert self.thread
+ assert self.thread
- conn = self.thread.conn
- stats = self.thread.stats
+ conn = self.thread.conn
+ stats = self.thread.stats
- try:
- self.do_process(conn, stats)
+ try:
+ self.do_process(conn, stats)
- conn.commit()
- except:
- log.exception("Update failed")
+ conn.commit()
+ except:
+ log.exception("Update failed")
- conn.rollback()
+ conn.rollback()
- def do_process(self, conn, stats):
- raise Exception("Not implemented")
+ def do_process(self, conn, stats):
+ raise Exception("Not implemented")
- def __repr__(self):
- return "%s(%i)" % (self.__class__.__name__, self.priority)
+ def __repr__(self):
+ return "%s(%i)" % (self.__class__.__name__, self.priority)
class ObjectUpdate(Update):
- def __init__(self, agent, object):
- super(ObjectUpdate, self).__init__()
+ def __init__(self, agent, object):
+ super(ObjectUpdate, self).__init__()
- self.agent = agent
- self.object = object
+ self.agent = agent
+ self.object = object
- self.object_id = str(QmfObjectId.fromObject(object))
+ self.object_id = str(QmfObjectId.fromObject(object))
- def getClass(self):
- # XXX this is unfortunate
- name = self.object.getClassKey().getClassName()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
- name = name[0].upper() + name[1:]
+ def getClass(self):
+ # XXX this is unfortunate
+ name = self.object.getClassKey().getClassName()
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
+ name = name[0].upper() + name[1:]
- try:
- return schemaNameToClassMap[name]
- except KeyError:
- raise ReferenceException(name)
+ try:
+ return schemaNameToClassMap[name]
+ except KeyError:
+ raise ReferenceException(name)
- def processAttributes(self, attrs, cls):
- results = dict()
+ def process_attributes(self, attrs, cls):
+ results = dict()
- for key, value in attrs:
- name = key.__repr__()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
+ for key, value in attrs:
+ name = key.__repr__()
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
- if key.type == 10:
- # XXX don't want oid around much
- self.processReference(name, value, results)
- continue
+ if key.type == 10:
+ # XXX don't want oid around much
+ self.process_reference(name, value, results)
+ continue
- if not hasattr(cls, name):
- # Discard attrs that we don't have in our schema
- log.debug("%s has no field '%s'" % (cls, name))
- continue
+ if not hasattr(cls, name):
+ # Discard attrs that we don't have in our schema
+ log.debug("%s has no field '%s'" % (cls, name))
+ continue
- if key.type == 8:
- self.processTimestamp(name, value, results)
- continue
+ if key.type == 8:
+ self.process_timestamp(name, value, results)
+ continue
- if key.type == 14:
- # Convert UUIDs into their string representation, to be
- # handled by sqlobject
- results[name] = str(value)
- continue
+ if key.type == 14:
+ # Convert UUIDs into their string representation, to be
+ # handled by sqlobject
+ results[name] = str(value)
+ continue
- if key.type == 15:
- #if value:
- results[name] = pickle.dumps(value)
- continue
+ if key.type == 15:
+ #if value:
+ results[name] = pickle.dumps(value)
+ continue
- results[name] = value
+ results[name] = value
- return results
+ return results
- # XXX this needs to be a much more straightforward procedure
- def processReference(self, name, oid, results):
- if name.endswith("Ref"):
- name = name[:-3]
+ # XXX this needs to be a much more straightforward procedure
+ def process_reference(self, name, oid, results):
+ if name.endswith("Ref"):
+ name = name[:-3]
- className = name[0].upper() + name[1:]
- otherClass = getattr(mint, className)
+ className = name[0].upper() + name[1:]
+ otherClass = getattr(mint, className)
- foreignKey = name + "_id"
+ foreignKey = name + "_id"
- id = self.agent.databaseIds.get(str(QmfObjectId(oid.first, oid.second)))
+ object_id = str(QmfObjectId(oid.first, oid.second))
+ id = self.agent.databaseIds.get(object_id)
- if id is None:
- # XXX don't want oid around much
- raise ReferenceException(oid)
+ if id is None:
+ # XXX don't want oid around much
+ raise ReferenceException(oid)
- results[foreignKey] = id
+ results[foreignKey] = id
- def processTimestamp(self, name, tstamp, results):
- if tstamp:
- t = datetime.fromtimestamp(tstamp / 1000000000)
- results[name] = t
+ def process_timestamp(self, name, tstamp, results):
+ if tstamp:
+ t = datetime.fromtimestamp(tstamp / 1000000000)
+ results[name] = t
- def __repr__(self):
- cls = self.object.getClassKey().getClassName()
+ def __repr__(self):
+ cls = self.object.getClassKey().getClassName()
- return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
- self.agent.id,
- cls,
- self.object_id,
- self.priority)
+ return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
+ self.agent.id,
+ cls,
+ self.object_id,
+ self.priority)
class PropertyUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
+ def do_process(self, conn, stats):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
- stats.dropCount += 1
+ stats.dropped += 1
- return
+ return
- try:
- attrs = self.processAttributes(self.object.getProperties(), cls)
- except ReferenceException, e:
- log.info("Referenced object %r not found", e.sought)
+ try:
+ attrs = self.process_attributes(self.object.getProperties(), cls)
+ except ReferenceException, e:
+ log.info("Referenced object %r not found", e.sought)
- self.agent.deferredUpdates[self.object_id].append(self)
+ self.agent.deferredUpdates[self.object_id].append(self)
- stats.deferCount += 1
+ stats.deferred += 1
- return
+ return
- update, create, delete = self.object.getTimestamps()
+ update, create, delete = self.object.getTimestamps()
- self.processTimestamp("qmfUpdateTime", update, attrs)
- self.processTimestamp("qmfCreateTime", create, attrs)
+ self.process_timestamp("qmfUpdateTime", update, attrs)
+ self.process_timestamp("qmfCreateTime", create, attrs)
- if delete != 0:
- self.processTimestamp("qmfDeleteTime", delete, attrs)
+ if delete != 0:
+ self.process_timestamp("qmfDeleteTime", delete, attrs)
- log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, self.object_id)
+ log.debug("%s(%s,%s) marked deleted",
+ cls.__name__, self.agent.id, self.object_id)
- attrs["qmfAgentId"] = self.agent.id
- attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(self.object_id)
- attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
+ attrs["qmfAgentId"] = self.agent.id
+ attrs["qmfClassKey"] = str(self.object.getClassKey())
+ attrs["qmfObjectId"] = str(self.object_id)
+ attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
- cursor = conn.cursor()
+ cursor = conn.cursor()
- # Cases:
- #
- # 1. Object is utterly new to mint
- # 2. Object is in mint's db, but id is not yet cached
- # 3. Object is in mint's db, and id is cached
+ # Cases:
+ #
+ # 1. Object is utterly new to mint
+ # 2. Object is in mint's db, but id is not yet cached
+ # 3. Object is in mint's db, and id is cached
- id = self.agent.databaseIds.get(self.object_id)
+ id = self.agent.databaseIds.get(self.object_id)
- if id is None:
- # Case 1 or 2
+ if id is None:
+ # Case 1 or 2
- op = SqlGetId(cls)
- op.execute(cursor, attrs)
+ op = SqlGetId(cls)
+ op.execute(cursor, attrs)
- rec = cursor.fetchone()
+ rec = cursor.fetchone()
- if rec:
- id = rec[0]
+ if rec:
+ id = rec[0]
- if id is None:
- # Case 1
+ if id is None:
+ # Case 1
- op = SqlInsert(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlInsert(cls, attrs)
+ op.execute(cursor, attrs)
- id = cursor.fetchone()[0]
+ id = cursor.fetchone()[0]
- log.debug("%s(%i) created", cls.__name__, id)
- else:
- # Case 2
+ log.debug("%s(%i) created", cls.__name__, id)
+ else:
+ # Case 2
- attrs["id"] = id
+ attrs["id"] = id
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
- assert cursor.rowcount == 1
+ assert cursor.rowcount == 1
- self.agent.databaseIds.set(self.object_id, id)
- else:
- # Case 3
+ self.agent.databaseIds.set(self.object_id, id)
+ else:
+ # Case 3
- attrs["id"] = id
+ attrs["id"] = id
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
- #assert cursor.rowcount == 1
+ #assert cursor.rowcount == 1
- try:
- updates = self.agent.deferredUpdates.pop(self.object_id)
+ try:
+ updates = self.agent.deferredUpdates.pop(self.object_id)
- if updates:
- log.info("Re-enqueueing %i orphans whose creation had been deferred",
- len(updates))
+ if updates:
+ log.info("Reenqueueing %i deferred updates", len(updates))
- for update in updates:
- self.thread.enqueue(update)
- except KeyError:
- pass
+ for update in updates:
+ self.thread.enqueue(update)
+ except KeyError:
+ pass
- self.agent.databaseIds.commit()
+ self.agent.databaseIds.commit()
- stats.propUpdateCount += 1
+ stats.prop_updated += 1
class StatisticUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
- return
+ def do_process(self, conn, stats):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
+ return
- statsCls = getattr(mint, "%sStats" % cls.__name__)
+ statsCls = getattr(mint, "%sStats" % cls.__name__)
- id = self.agent.databaseIds.get(self.object_id)
+ id = self.agent.databaseIds.get(self.object_id)
- if id is None:
- stats.dropCount += 1
- return
+ if id is None:
+ stats.dropped += 1
+ return
- timestamps = self.object.getTimestamps()
+ timestamps = self.object.getTimestamps()
- tnow = datetime.now()
- t = datetime.fromtimestamp(timestamps[0] / 1000000000)
+ tnow = datetime.now()
+ t = datetime.fromtimestamp(timestamps[0] / 1000000000)
- if t < tnow - timedelta(seconds=30):
- log.debug("Update is %i seconds old; skipping it", (tnow - t).seconds)
+ if t < tnow - timedelta(seconds=30):
+ seconds = (tnow - t).seconds
+ log.debug("Update is %i seconds old; skipping it", seconds)
- stats.dropCount += 1
+ stats.dropped += 1
- return
+ return
- try:
- attrs = self.processAttributes(self.object.getStatistics(), statsCls)
- except ReferenceException:
- stats.dropCount += 1
+ try:
+ attrs = self.process_attributes \
+ (self.object.getStatistics(), statsCls)
+ except ReferenceException:
+ stats.dropped += 1
- return
+ return
- attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
- attrs["%s_id" % cls.sqlmeta.table] = id
+ # XXX do we still want this
+ attrs["qmfUpdateTime"] = t > tnow and tnow or t
+ attrs["%s_id" % cls.sqlmeta.table] = id
- cursor = conn.cursor()
+ cursor = conn.cursor()
- op = SqlInsert(statsCls, attrs)
- op.execute(cursor, attrs)
+ op = SqlInsert(statsCls, attrs)
+ op.execute(cursor, attrs)
- log.debug("%s(%s) created", statsCls.__name__, id)
+ log.debug("%s(%s) created", statsCls.__name__, id)
- stats.statUpdateCount += 1
+ stats.stat_updated += 1
class AgentDisconnectUpdate(Update):
- def __init__(self, agent):
- super(AgentDisconnectUpdate, self).__init__()
+ def __init__(self, agent):
+ super(AgentDisconnectUpdate, self).__init__()
- self.agent = agent
+ self.agent = agent
- def do_process(self, conn, stats):
- cursor = conn.cursor()
+ def do_process(self, conn, stats):
+ cursor = conn.cursor()
- args = dict()
+ args = dict()
- if self.agent:
- args["qmf_agent_id"] = self.agent.id
+ if self.agent:
+ args["qmf_agent_id"] = self.agent.id
- op = SqlAgentDisconnect(self.agent)
- op.execute(cursor, args)
+ op = SqlAgentDisconnect(self.agent)
+ op.execute(cursor, args)
class UpdateQueue(ConcurrentQueue):
- def __init__(self, maxsize=0, slotCount=1):
- self.slotCount = slotCount
- ConcurrentQueue.__init__(self, maxsize)
+ def __init__(self, maxsize=0, slotCount=1):
+ self.slotCount = slotCount
+ ConcurrentQueue.__init__(self, maxsize)
- def _init(self, maxsize):
- self.maxsize = maxsize
- self.slots = []
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.slots = []
- for i in range(self.slotCount):
- self.slots.append(deque())
+ for i in range(self.slotCount):
+ self.slots.append(deque())
- def _qsize(self):
- size = 0
+ def _qsize(self):
+ size = 0
- for i in range(self.slotCount):
- size += len(self.slots[i])
+ for i in range(self.slotCount):
+ size += len(self.slots[i])
- return size
+ return size
- def _empty(self):
- return self._qsize() == 0
+ def _empty(self):
+ return self._qsize() == 0
- def _full(self):
- return self.maxsize > 0 and self._qsize() == self.maxsize
+ def _full(self):
+ return self.maxsize > 0 and self._qsize() == self.maxsize
- def _put(self, update):
- slot = update.priority
+ def _put(self, update):
+ slot = update.priority
- if slot in range(self.slotCount):
- self.slots[slot].append(update)
- else:
- raise ValueError("Invalid priority slot")
+ if slot in range(self.slotCount):
+ self.slots[slot].append(update)
+ else:
+ raise ValueError("Invalid priority slot")
- def _get(self):
- for slot in range(self.slotCount):
- if len(self.slots[slot]) > 0:
- return self.slots[slot].popleft()
- return None
+ def _get(self):
+ for slot in range(self.slotCount):
+ if len(self.slots[slot]) > 0:
+ return self.slots[slot].popleft()
+
+ return None
More information about the rhmessaging-commits
mailing list