Author: justi9
Date: 2010-01-11 13:46:28 -0500 (Mon, 11 Jan 2010)
New Revision: 3781
Modified:
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/python/mint/vacuum.py
Log:
* Move certain non-object update operations to dedicated files
* Change the update interface to account for non-object updates
* Isolate stats in their own record object
* Move the on-open agent disconnect update to start from init
* Improve update logging
Modified: mgmt/trunk/mint/python/mint/expire.py
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/expire.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -1,11 +1,10 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
+from schema import *
from sql import *
+from update import *
from util import *
+import mint
+
log = logging.getLogger("mint.expire")
class ExpireThread(MintDaemonThread):
@@ -39,10 +38,12 @@
while True:
if self.stopRequested:
break
- up = mint.update.ExpireUpdate(self.app.model)
+
+ up = ExpireUpdate()
self.app.updateThread.enqueue(up)
- time.sleep(frequency)
+ sleep(frequency)
+
def __convertTimeUnits(self, t):
if t / (24*3600) >= 1:
t_out = t / (24*3600)
@@ -57,3 +58,25 @@
t_out = t
t_unit = "seconds"
return (t_out, t_unit)
+
+class ExpireUpdate(Update):
+ def do_process(self, conn, stats):
+ attrs = self.thread.app.expireThread.attrs
+
+ cursor = conn.cursor()
+ total = 0
+
+ for op in self.thread.app.expireThread.ops:
+ log.debug("Running expire op %s", op)
+
+ count = op.execute(cursor, attrs)
+
+ conn.commit()
+
+ log.debug("%i records expired", count)
+
+ total += count
+
+ log.debug("%i total records expired", total)
+
+ stats.expireUpdateCount += 1
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -46,14 +46,14 @@
self.qmfSession = Session \
(self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+ def do_start(self):
# Clean up any transient objects that a previous instance may have
# left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(self, None)
+ up = update.AgentDisconnectUpdate(None)
self.app.updateThread.enqueue(up)
- def do_start(self):
uris = [x.strip() for x in self.app.config.qmf.split(",")]
for uri in uris:
@@ -137,7 +137,7 @@
finally:
self.lock.release()
- up = update.AgentDisconnectUpdate(self, agent)
+ up = update.AgentDisconnectUpdate(agent)
self.app.updateThread.enqueue(up)
def heartbeat(self, qagent, timestamp):
@@ -177,7 +177,7 @@
finally:
self.lock.release()
- up = update.PropertyUpdate(self, agent, object)
+ up = update.PropertyUpdate(agent, object)
self.app.updateThread.enqueue(up)
def objectStats(self, broker, object):
@@ -198,7 +198,7 @@
finally:
self.lock.release()
- up = update.StatisticUpdate(self, agent, object)
+ up = update.StatisticUpdate(agent, object)
self.app.updateThread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -156,7 +156,6 @@
app = Mint(self.config)
app.updateEnabled = False
- app.pollEnabled = False
app.expireEnabled = False
self.database = MintDatabase(app)
@@ -472,16 +471,16 @@
sleep(1)
- ut = app.updateThread
+ stats = app.updateThread.stats
- enq = ut.enqueueCount
- deq = ut.dequeueCount
- drp = ut.dropCount
- dfr = ut.deferCount
+ enq = stats.enqueueCount
+ deq = stats.dequeueCount
+ drp = stats.dropCount
+ dfr = stats.deferCount
- prop = ut.propUpdateCount
- stat = ut.statUpdateCount
- exp = ut.expireUpdateCount
+ prop = stats.propUpdateCount
+ stat = stats.statUpdateCount
+ exp = stats.expireUpdateCount
print row % (enq - enq_last,
deq - deq_last,
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -18,34 +18,26 @@
def __init__(self, app):
super(UpdateThread, self).__init__(app)
- self.updates = UpdateQueue(slotCount=2)
-
- self.enqueueCount = 0
- self.dequeueCount = 0
- self.statUpdateCount = 0
- self.propUpdateCount = 0
- self.expireUpdateCount = 0
- self.dropCount = 0
- self.deferCount = 0
- self.commitThreshold = 100
-
self.conn = None
+ self.stats = None
+ self.updates = UpdateQueue(slotCount=2)
+
def init(self):
self.conn = self.app.database.getConnection()
+ self.stats = UpdateStats()
def enqueue(self, update):
- try:
- self.updates.put(update)
+ update.thread = self
- self.enqueueCount += 1
- except Full:
- log.exception("Queue is full")
+ self.updates.put(update)
- if self.updates.qsize() > 1000:
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
+ self.stats.enqueueCount += 1
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
+
+ if self.updates.qsize() > 1000:
sleep(0.1)
def run(self):
@@ -58,55 +50,63 @@
except Empty:
continue
- self.dequeueCount += 1
+ self.stats.dequeueCount += 1
- self.processUpdate(update)
+ update.process()
- def processUpdate(self, update):
- log.debug("Processing %s", update)
+class UpdateStats(object):
+ def __init__(self):
+ self.enqueueCount = 0
+ self.dequeueCount = 0
+ self.statUpdateCount = 0
+ self.propUpdateCount = 0
+ self.expireUpdateCount = 0
+ self.dropCount = 0
+ self.deferCount = 0
- try:
- update.process(self)
+class ReferenceException(Exception):
+ def __init__(self, sought):
+ self.sought = sought
- if self.dequeueCount % self.commitThreshold == 0 \
- or self.updates.qsize() == 0:
- # commit only every "commitThreshold" updates, or whenever
- # the update queue is empty
+ def __str__(self):
+ return repr(self.sought)
- update.commit()
- self.conn.commit()
+class Update(object):
+ def __init__(self):
+ self.thread = None
+ self.priority = 0
+
+ def process(self):
+ log.debug("Processing %s", self)
+
+ assert self.thread
+
+ conn = self.thread.conn
+ stats = self.thread.stats
+
+ try:
+ self.do_process(conn, stats)
+
+ conn.commit()
except:
log.exception("Update failed")
- update.rollback()
- self.conn.rollback()
+ conn.rollback()
- def cursor(self):
- return self.conn.cursor()
+ def do_process(self, conn, stats):
+ raise Exception("Not implemented")
-class ReferenceException(Exception):
- def __init__(self, sought):
- self.sought = sought
+ def __repr__(self):
+ return "%s(%i)" % (self.__class__.__name__, self.priority)
- def __str__(self):
- return repr(self.sought)
+class ObjectUpdate(Update):
+ def __init__(self, agent, object):
+ super(ObjectUpdate, self).__init__()
-class ModelUpdate(object):
- def __init__(self, model, agent, object):
- if agent:
- from mint.model import MintAgent
- assert isinstance(agent, MintAgent)
-
- self.model = model
self.agent = agent
self.object = object
- self.priority = 0
- def __repr__(self):
- return "%s(%s, %s, %i)" % (self.__class__.__name__,
- str(self.agent),
- str(self.object),
- self.priority)
+ self.object_id = str(QmfObjectId.fromObject(object))
def getClass(self):
# XXX this is unfortunate
@@ -119,9 +119,6 @@
except KeyError:
raise ReferenceException(name)
- def process(self, thread):
- pass
-
def processAttributes(self, attrs, cls):
results = dict()
@@ -181,38 +178,35 @@
t = datetime.fromtimestamp(tstamp / 1000000000)
results[name] = t
- def commit(self):
- # XXX commit db here too
+ def __repr__(self):
+ cls = self.object.getClassKey().getClassName()
- if self.agent:
- self.agent.databaseIds.commit()
+ return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
+ self.agent.id,
+ cls,
+ self.object_id,
+ self.priority)
- def rollback(self):
- # XXX rollback db here too
-
- if self.agent:
- self.agent.databaseIds.rollback()
-
-class PropertyUpdate(ModelUpdate):
- def process(self, thread):
+class PropertyUpdate(ObjectUpdate):
+ def do_process(self, conn, stats):
try:
cls = self.getClass()
except ReferenceException, e:
log.info("Referenced class %r not found", e.sought)
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
- qmfObjectId = str(QmfObjectId.fromObject(self.object))
-
try:
attrs = self.processAttributes(self.object.getProperties(), cls)
except ReferenceException, e:
log.info("Referenced object %r not found", e.sought)
- self.agent.deferredUpdates[qmfObjectId].append(self)
+ self.agent.deferredUpdates[self.object_id].append(self)
- thread.deferCount += 1
+ stats.deferCount += 1
+
return
update, create, delete = self.object.getTimestamps()
@@ -224,14 +218,14 @@
self.processTimestamp("qmfDeleteTime", delete, attrs)
log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, qmfObjectId)
+ cls.__name__, self.agent.id, self.object_id)
attrs["qmfAgentId"] = self.agent.id
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(qmfObjectId)
+ attrs["qmfObjectId"] = str(self.object_id)
attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
- cursor = thread.cursor()
+ cursor = conn.cursor()
# Cases:
#
@@ -239,7 +233,7 @@
# 2. Object is in mint's db, but id is not yet cached
# 3. Object is in mint's db, and id is cached
- id = self.agent.databaseIds.get(qmfObjectId)
+ id = self.agent.databaseIds.get(self.object_id)
if id is None:
# Case 1 or 2
@@ -271,7 +265,7 @@
assert cursor.rowcount == 1
- self.agent.databaseIds.set(qmfObjectId, id)
+ self.agent.databaseIds.set(self.object_id, id)
else:
# Case 3
@@ -283,21 +277,23 @@
#assert cursor.rowcount == 1
try:
- updates = self.agent.deferredUpdates.pop(qmfObjectId)
+ updates = self.agent.deferredUpdates.pop(self.object_id)
if updates:
log.info("Re-enqueueing %i orphans whose creation had been deferred",
len(updates))
for update in updates:
- thread.enqueue(update)
+ self.thread.enqueue(update)
except KeyError:
pass
- thread.propUpdateCount += 1
+ self.agent.databaseIds.commit()
-class StatisticUpdate(ModelUpdate):
- def process(self, thread):
+ stats.propUpdateCount += 1
+
+class StatisticUpdate(ObjectUpdate):
+ def do_process(self, conn, stats):
try:
cls = self.getClass()
except ReferenceException, e:
@@ -306,12 +302,10 @@
statsCls = getattr(mint, "%sStats" % cls.__name__)
- qmfObjectId = str(QmfObjectId.fromObject(self.object))
+ id = self.agent.databaseIds.get(self.object_id)
- id = self.agent.databaseIds.get(qmfObjectId)
-
if id is None:
- thread.dropCount += 1
+ stats.dropCount += 1
return
timestamps = self.object.getTimestamps()
@@ -322,60 +316,38 @@
if t < tnow - timedelta(seconds=30):
log.debug("Update is %i seconds old; skipping it", (tnow - t).seconds)
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
try:
attrs = self.processAttributes(self.object.getStatistics(), statsCls)
except ReferenceException:
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want
this
attrs["%s_id" % cls.sqlmeta.table] = id
- cursor = thread.cursor()
+ cursor = conn.cursor()
op = SqlInsert(statsCls, attrs)
op.execute(cursor, attrs)
log.debug("%s(%s) created", statsCls.__name__, id)
- thread.statUpdateCount += 1
+ stats.statUpdateCount += 1
-class ExpireUpdate(ModelUpdate):
- def __init__(self, model):
- super(ExpireUpdate, self).__init__(model, None, None)
+class AgentDisconnectUpdate(Update):
+ def __init__(self, agent):
+ super(AgentDisconnectUpdate, self).__init__()
- def process(self, thread):
- cursor = thread.cursor()
- attrs = thread.app.expireThread.attrs
- total = 0
+ self.agent = agent
- thread.conn.commit()
+ def do_process(self, conn, stats):
+ cursor = conn.cursor()
- for op in thread.app.expireThread.ops:
- log.debug("Running expire op %s", op)
-
- count = op.execute(cursor, attrs)
-
- thread.conn.commit()
-
- log.debug("%i records expired", count)
-
- total += count
-
- log.debug("%i total records expired", total)
-
- thread.expireUpdateCount += 1
-
-class AgentDisconnectUpdate(ModelUpdate):
- def __init__(self, model, agent):
- super(AgentDisconnectUpdate, self).__init__(model, agent, None)
-
- def process(self, thread):
- cursor = thread.cursor()
-
args = dict()
if self.agent:
Modified: mgmt/trunk/mint/python/mint/vacuum.py
===================================================================
--- mgmt/trunk/mint/python/mint/vacuum.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/vacuum.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -4,32 +4,26 @@
log = logging.getLogger("mint.vacuum")
class VacuumThread(MintDaemonThread):
- def __init__(self, app):
- super(VacuumThread, self).__init__(app)
-
def run(self):
while True:
if self.stopRequested:
break
- up = VacuumUpdate(self.app.model)
+ up = VacuumUpdate()
self.app.updateThread.enqueue(up)
sleep(60 * 10)
-class VacuumUpdate(ModelUpdate):
- def __init__(self, model):
- super(VacuumUpdate, self).__init__(model, None, None)
-
- def process(self, thread):
+class VacuumUpdate(Update):
+ def do_process(self, conn, stats):
log.info("Vacuuming")
- thread.conn.commit()
+ conn.commit()
- level = thread.conn.isolation_level
- thread.conn.set_isolation_level(0)
+ level = conn.isolation_level
+ conn.set_isolation_level(0)
- cursor = thread.cursor()
+ cursor = conn.cursor()
cursor.execute("vacuum")
- thread.conn.set_isolation_level(level)
+ conn.set_isolation_level(level)