Author: justi9
Date: 2008-12-17 13:03:47 -0500 (Wed, 17 Dec 2008)
New Revision: 3016
Modified:
mgmt/trunk/mint/bin/mint-bench
mgmt/trunk/mint/python/mint/__init__.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
Log:
Consolidate commit and rollback behaviors under methods.
Pass the update thread into update objects.
Isolate the update invocation from the run loop, so that switching to
a single-threaded mode for profiling is possible.
Shorten the mint-bench profile calibration step.
Add more counters to the update thread, for tracking deferred and
discarded updates.
Modified: mgmt/trunk/mint/bin/mint-bench
===================================================================
--- mgmt/trunk/mint/bin/mint-bench 2008-12-17 18:00:07 UTC (rev 3015)
+++ mgmt/trunk/mint/bin/mint-bench 2008-12-17 18:03:47 UTC (rev 3016)
@@ -20,8 +20,8 @@
biases = list()
- for i in range(5):
- bias = prof.calibrate(100000)
+ for i in range(4):
+ bias = prof.calibrate(20000)
biases.append(bias)
print i, bias
Modified: mgmt/trunk/mint/python/mint/__init__.py
===================================================================
--- mgmt/trunk/mint/python/mint/__init__.py 2008-12-17 18:00:07 UTC (rev 3015)
+++ mgmt/trunk/mint/python/mint/__init__.py 2008-12-17 18:03:47 UTC (rev 3016)
@@ -384,6 +384,8 @@
self.qmfSession = qmf.console.Session \
(self, manageConnections=True, rcvObjects=self.updateObjects)
+ self.updateThread.init()
+
def start(self):
self.updateThread.start()
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2008-12-17 18:00:07 UTC (rev 3015)
+++ mgmt/trunk/mint/python/mint/tools.py 2008-12-17 18:03:47 UTC (rev 3016)
@@ -67,9 +67,10 @@
if "debug" in opts:
level = "debug"
+ file = sys.stderr
+ else:
+ file = opts.get("log-file", sys.stderr)
- file = opts.get("log-file", sys.stderr)
-
enable_logging("mint", level, file)
data = opts.get("data",
"postgresql://cumin@localhost/cumin")
@@ -157,22 +158,33 @@
enq_last = 0
deq_last = 0
- print "enqs", "\t", "deqs", "\t",
"depth"
+ head = "%10s %10s %10s %10s %10s" % \
+ ("enqs", "deqs", "depth",
"discard", "defer")
+ row = "%10i %10i %10i %10i %10i"
+ print head
+
while True:
sleep(1)
- enq = model.updateThread.enqueueCount
- deq = model.updateThread.dequeueCount
+ ut = model.updateThread
- enq_rate = enq - enq_last
- deq_rate = deq - deq_last
+ enq = ut.enqueueCount
+ deq = ut.dequeueCount
- print enq_rate, "\t", deq_rate, "\t", enq - deq
+ print row % (enq - enq_last,
+ deq - deq_last,
+ enq - deq,
+ ut.discardCount,
+ ut.deferCount)
enq_last = enq
deq_last = deq
finally:
+ #from threading import enumerate
+ #for item in enumerate():
+ # print item
+
for broker in added:
model.delBroker(broker)
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2008-12-17 18:00:07 UTC (rev 3015)
+++ mgmt/trunk/mint/python/mint/update.py 2008-12-17 18:03:47 UTC (rev 3016)
@@ -25,8 +25,15 @@
self.enqueueCount = 0
self.dequeueCount = 0
+ self.discardCount = 0
+ self.deferCount = 0
self.commitThreshold = 100
+ self.conn = None
+
+ def init(self):
+ self.conn = self.model.dbConn.getConnection()
+
def enqueue(self, update):
try:
self.updates.put(update)
@@ -36,12 +43,12 @@
log.exception("Queue is full")
if self.updates.qsize() > 1000:
- # This is an attempt to yield
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
+
sleep(0.1)
def run(self):
- conn = self.model.dbConn.getConnection()
-
while True:
try:
update = self.updates.get(True, 1)
@@ -59,35 +66,37 @@
log.debug("Queue is empty")
continue
- try:
- update.process(conn)
+ self.process_update(update)
+
+ def process_update(self, update):
+ try:
+ update.process(self)
- if self.dequeueCount % self.commitThreshold == 0 \
- or self.updates.qsize() == 0:
- # commit only every "commitThreshold" updates, or whenever
- # the update queue is empty
+ if self.dequeueCount % self.commitThreshold == 0 \
+ or self.updates.qsize() == 0:
+ # commit only every "commitThreshold" updates, or whenever
+ # the update queue is empty
- if profile:
- start = clock()
+ self.commit()
+ except:
+ log.exception("Update failed")
- conn.commit()
+ self.rollback()
- for broker in self.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.commit()
+ def cursor(self):
+ return self.conn.cursor()
- profile.commitTime += clock() - start
- else:
- conn.commit()
+ def commit(self):
+ self.conn.commit()
- for broker in self.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.commit()
- except:
- conn.rollback()
+ for broker in self.model.mintBrokersByQmfBroker.values():
+ broker.objectDatabaseIds.commit()
- for broker in self.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.rollback()
+ def rollback(self):
+ self.conn.rollback()
- log.exception("Update failed")
+ for broker in self.model.mintBrokersByQmfBroker.values():
+ broker.objectDatabaseIds.rollback()
class ReferenceException(Exception):
def __init__(self, sought):
@@ -120,7 +129,7 @@
except KeyError:
raise ReferenceException(name)
- def process(self, conn):
+ def process(self, thread):
pass
def processAttributes(self, attrs, cls):
@@ -171,11 +180,13 @@
results[name] = t
class PropertyUpdate(ModelUpdate):
- def process(self, conn):
+ def process(self, thread):
try:
cls = self.getClass()
except ReferenceException, e:
log.info("Referenced class %r not found", e.sought)
+
+ thread.discardCount += 1
return
oid = self.object.getObjectId()
@@ -191,6 +202,7 @@
except KeyError:
self.broker.orphans[oid] = list((self,))
+ thread.deferCount += 1
return
timestamps = self.object.getTimestamps()
@@ -208,7 +220,7 @@
attrs["qmfClassKey"] = str(self.object.getClassKey())
attrs["qmfBrokerId"] = str(self.broker.qmfBroker.getBrokerId())
- cursor = conn.cursor()
+ cursor = thread.cursor()
# Cases:
#
@@ -271,7 +283,7 @@
len(orphans))
for orphan in orphans:
- self.model.updateThread.enqueue(orphan)
+ thread.enqueue(orphan)
except KeyError:
pass
@@ -295,7 +307,7 @@
self.broker.databaseId = id
class StatisticUpdate(ModelUpdate):
- def process(self, conn):
+ def process(self, thread):
try:
cls = self.getClass()
except ReferenceException, e:
@@ -308,7 +320,7 @@
id = self.broker.objectDatabaseIds.get(oid)
if id is None:
- # Just drop it; we'll get more stats later
+ thread.discardCount += 1
return
timestamps = self.object.getTimestamps()
@@ -318,18 +330,20 @@
if t < tnow - timedelta(seconds=30):
log.debug("Update is %i seconds old; skipping it", (tnow -t).seconds)
+
+ thread.discardCount += 1
return
try:
attrs = self.processAttributes(self.object.getStatistics(), statsCls)
except ReferenceException:
- # Drop it
+ thread.discardCount += 1
return
attrs["qmfUpdateTime"] = t > tnow and tnow or t
attrs["%s_id" % cls.sqlmeta.table] = id
- cursor = conn.cursor()
+ cursor = thread.cursor()
op = SqlInsert(statsCls, attrs)
op.execute(cursor, attrs)
@@ -342,7 +356,7 @@
self.seq = seq
- def process(self, conn):
+ def process(self, thread):
self.model.lock()
try:
@@ -355,19 +369,19 @@
def __init__(self, model):
super(DBExpireUpdate, self).__init__(model, None, None)
- def process(self, conn):
- cursor = conn.cursor()
+ def process(self, thread):
+ cursor = thread.cursor()
attrs = self.model.dbExpireThread.attrs
total = 0
- conn.commit()
+ thread.commit()
for op in self.model.dbExpireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
- conn.commit()
+ thread.commit()
log.debug("%i records expired", count)