[rhmessaging-commits] rhmessaging commits: r3781 - mgmt/trunk/mint/python/mint.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Jan 11 13:46:28 EST 2010


Author: justi9
Date: 2010-01-11 13:46:28 -0500 (Mon, 11 Jan 2010)
New Revision: 3781

Modified:
   mgmt/trunk/mint/python/mint/expire.py
   mgmt/trunk/mint/python/mint/model.py
   mgmt/trunk/mint/python/mint/tools.py
   mgmt/trunk/mint/python/mint/update.py
   mgmt/trunk/mint/python/mint/vacuum.py
Log:
 * Move certain non-object update operations to dedicated files

 * Change the update interface to account for non-object updates

 * Isolate stats in their own record object

 * Move the on-open agent disconnect update to start from init

 * Improve update logging


Modified: mgmt/trunk/mint/python/mint/expire.py
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py	2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/expire.py	2010-01-11 18:46:28 UTC (rev 3781)
@@ -1,11 +1,10 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
+from schema import *
 from sql import *
+from update import *
 from util import *
 
+import mint
+
 log = logging.getLogger("mint.expire")
 
 class ExpireThread(MintDaemonThread):
@@ -39,10 +38,12 @@
     while True:
       if self.stopRequested:
         break
-      up = mint.update.ExpireUpdate(self.app.model)
+
+      up = ExpireUpdate()
       self.app.updateThread.enqueue(up)
-      time.sleep(frequency)
 
+      sleep(frequency)
+
   def __convertTimeUnits(self, t):
     if t / (24*3600) >= 1:
       t_out = t / (24*3600)
@@ -57,3 +58,25 @@
       t_out = t
       t_unit = "seconds"
     return (t_out, t_unit)
+
+class ExpireUpdate(Update):
+  def do_process(self, conn, stats):
+    attrs = self.thread.app.expireThread.attrs
+
+    cursor = conn.cursor()
+    total = 0
+
+    for op in self.thread.app.expireThread.ops:
+      log.debug("Running expire op %s", op)
+
+      count = op.execute(cursor, attrs)
+
+      conn.commit()
+
+      log.debug("%i records expired", count)
+
+      total += count
+
+    log.debug("%i total records expired", total)
+
+    stats.expireUpdateCount += 1

Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py	2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/model.py	2010-01-11 18:46:28 UTC (rev 3781)
@@ -46,14 +46,14 @@
     self.qmfSession = Session \
         (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
 
+  def do_start(self):
     # Clean up any transient objects that a previous instance may have
     # left behind in the DB; it's basically an unconstrained agent
     # disconnect update, for any agent
 
-    up = update.AgentDisconnectUpdate(self, None)
+    up = update.AgentDisconnectUpdate(None)
     self.app.updateThread.enqueue(up)
 
-  def do_start(self):
     uris = [x.strip() for x in self.app.config.qmf.split(",")]
 
     for uri in uris:
@@ -137,7 +137,7 @@
     finally:
       self.lock.release()
  
-    up = update.AgentDisconnectUpdate(self, agent)
+    up = update.AgentDisconnectUpdate(agent)
     self.app.updateThread.enqueue(up)
 
   def heartbeat(self, qagent, timestamp):
@@ -177,7 +177,7 @@
     finally:
       self.lock.release()
 
-    up = update.PropertyUpdate(self, agent, object)
+    up = update.PropertyUpdate(agent, object)
     self.app.updateThread.enqueue(up)
 
   def objectStats(self, broker, object):
@@ -198,7 +198,7 @@
     finally:
       self.lock.release()
 
-    up = update.StatisticUpdate(self, agent, object)
+    up = update.StatisticUpdate(agent, object)
     self.app.updateThread.enqueue(up)
 
   def event(self, broker, event):

Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py	2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/tools.py	2010-01-11 18:46:28 UTC (rev 3781)
@@ -156,7 +156,6 @@
 
         app = Mint(self.config)
         app.updateEnabled = False
-        app.pollEnabled = False
         app.expireEnabled = False
 
         self.database = MintDatabase(app)
@@ -472,16 +471,16 @@
 
                     sleep(1)
 
-                    ut = app.updateThread
+                    stats = app.updateThread.stats
 
-                    enq = ut.enqueueCount
-                    deq = ut.dequeueCount
-                    drp = ut.dropCount
-                    dfr = ut.deferCount
+                    enq = stats.enqueueCount
+                    deq = stats.dequeueCount
+                    drp = stats.dropCount
+                    dfr = stats.deferCount
 
-                    prop = ut.propUpdateCount
-                    stat = ut.statUpdateCount
-                    exp = ut.expireUpdateCount
+                    prop = stats.propUpdateCount
+                    stat = stats.statUpdateCount
+                    exp = stats.expireUpdateCount
 
                     print row % (enq - enq_last,
                                  deq - deq_last,

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/update.py	2010-01-11 18:46:28 UTC (rev 3781)
@@ -18,34 +18,26 @@
   def __init__(self, app):
     super(UpdateThread, self).__init__(app)
 
-    self.updates = UpdateQueue(slotCount=2)
-
-    self.enqueueCount = 0
-    self.dequeueCount = 0
-    self.statUpdateCount = 0
-    self.propUpdateCount = 0
-    self.expireUpdateCount = 0
-    self.dropCount = 0
-    self.deferCount = 0
-    self.commitThreshold = 100
-
     self.conn = None
+    self.stats = None
 
+    self.updates = UpdateQueue(slotCount=2)
+
   def init(self):
     self.conn = self.app.database.getConnection()
+    self.stats = UpdateStats()
 
   def enqueue(self, update):
-    try:
-      self.updates.put(update)
+    update.thread = self
 
-      self.enqueueCount += 1
-    except Full:
-      log.exception("Queue is full")
+    self.updates.put(update)
 
-    if self.updates.qsize() > 1000:
-      # This is an attempt to yield from the enqueueing thread (this
-      # method's caller) to the update thread
+    self.stats.enqueueCount += 1
 
+    # This is an attempt to yield from the enqueueing thread (this
+    # method's caller) to the update thread
+
+    if self.updates.qsize() > 1000:
       sleep(0.1)
 
   def run(self):
@@ -58,55 +50,63 @@
       except Empty:
         continue
 
-      self.dequeueCount += 1
+      self.stats.dequeueCount += 1
 
-      self.processUpdate(update)
+      update.process()
 
-  def processUpdate(self, update):
-    log.debug("Processing %s", update)
+class UpdateStats(object):
+  def __init__(self):
+    self.enqueueCount = 0
+    self.dequeueCount = 0
+    self.statUpdateCount = 0
+    self.propUpdateCount = 0
+    self.expireUpdateCount = 0
+    self.dropCount = 0
+    self.deferCount = 0
 
-    try:
-      update.process(self)
+class ReferenceException(Exception):
+    def __init__(self, sought):
+        self.sought = sought
 
-      if self.dequeueCount % self.commitThreshold == 0 \
-              or self.updates.qsize() == 0:
-        # commit only every "commitThreshold" updates, or whenever
-        # the update queue is empty
+    def __str__(self):
+        return repr(self.sought)
 
-        update.commit()
-        self.conn.commit()
+class Update(object):
+  def __init__(self):
+    self.thread = None
+    self.priority = 0
+
+  def process(self):
+    log.debug("Processing %s", self)
+
+    assert self.thread
+
+    conn = self.thread.conn
+    stats = self.thread.stats
+
+    try:
+      self.do_process(conn, stats)
+
+      conn.commit()
     except:
       log.exception("Update failed")
 
-      update.rollback()
-      self.conn.rollback()
+      conn.rollback()
 
-  def cursor(self):
-    return self.conn.cursor()
+  def do_process(self, conn, stats):
+    raise Exception("Not implemented")
 
-class ReferenceException(Exception):
-    def __init__(self, sought):
-        self.sought = sought
+  def __repr__(self):
+    return "%s(%i)" % (self.__class__.__name__, self.priority)
 
-    def __str__(self):
-        return repr(self.sought)
+class ObjectUpdate(Update):
+  def __init__(self, agent, object):
+    super(ObjectUpdate, self).__init__()
 
-class ModelUpdate(object):
-  def __init__(self, model, agent, object):
-    if agent:
-      from mint.model import MintAgent
-      assert isinstance(agent, MintAgent)
-
-    self.model = model
     self.agent = agent
     self.object = object
-    self.priority = 0
 
-  def __repr__(self):
-    return "%s(%s, %s, %i)" % (self.__class__.__name__,
-                               str(self.agent),
-                               str(self.object),
-                               self.priority)
+    self.object_id = str(QmfObjectId.fromObject(object))
 
   def getClass(self):
     # XXX this is unfortunate
@@ -119,9 +119,6 @@
     except KeyError:
       raise ReferenceException(name)
 
-  def process(self, thread):
-    pass
-
   def processAttributes(self, attrs, cls):
     results = dict()
 
@@ -181,38 +178,35 @@
       t = datetime.fromtimestamp(tstamp / 1000000000)
       results[name] = t
 
-  def commit(self):
-    # XXX commit db here too
+  def __repr__(self):
+    cls = self.object.getClassKey().getClassName()
 
-    if self.agent:
-      self.agent.databaseIds.commit()
+    return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
+                                self.agent.id,
+                                cls,
+                                self.object_id,
+                                self.priority)
 
-  def rollback(self):
-    # XXX rollback db here too
-
-    if self.agent:
-      self.agent.databaseIds.rollback()
-
-class PropertyUpdate(ModelUpdate):
-  def process(self, thread):
+class PropertyUpdate(ObjectUpdate):
+  def do_process(self, conn, stats):
     try:
       cls = self.getClass()
     except ReferenceException, e:
       log.info("Referenced class %r not found", e.sought)
 
-      thread.dropCount += 1
+      stats.dropCount += 1
+
       return
 
-    qmfObjectId = str(QmfObjectId.fromObject(self.object))
-
     try:
       attrs = self.processAttributes(self.object.getProperties(), cls)
     except ReferenceException, e:
       log.info("Referenced object %r not found", e.sought)
 
-      self.agent.deferredUpdates[qmfObjectId].append(self)
+      self.agent.deferredUpdates[self.object_id].append(self)
 
-      thread.deferCount += 1
+      stats.deferCount += 1
+
       return
 
     update, create, delete = self.object.getTimestamps()
@@ -224,14 +218,14 @@
       self.processTimestamp("qmfDeleteTime", delete, attrs)
 
       log.debug("%s(%s,%s) marked deleted",
-                cls.__name__, self.agent.id, qmfObjectId)
+                cls.__name__, self.agent.id, self.object_id)
 
     attrs["qmfAgentId"] = self.agent.id
     attrs["qmfClassKey"] = str(self.object.getClassKey())
-    attrs["qmfObjectId"] = str(qmfObjectId)
+    attrs["qmfObjectId"] = str(self.object_id)
     attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
 
-    cursor = thread.cursor()
+    cursor = conn.cursor()
 
     # Cases:
     #
@@ -239,7 +233,7 @@
     # 2. Object is in mint's db, but id is not yet cached
     # 3. Object is in mint's db, and id is cached
 
-    id = self.agent.databaseIds.get(qmfObjectId)
+    id = self.agent.databaseIds.get(self.object_id)
 
     if id is None:
       # Case 1 or 2
@@ -271,7 +265,7 @@
 
         assert cursor.rowcount == 1
 
-      self.agent.databaseIds.set(qmfObjectId, id)
+      self.agent.databaseIds.set(self.object_id, id)
     else:
       # Case 3
 
@@ -283,21 +277,23 @@
       #assert cursor.rowcount == 1
 
     try:
-      updates = self.agent.deferredUpdates.pop(qmfObjectId)
+      updates = self.agent.deferredUpdates.pop(self.object_id)
 
       if updates:
         log.info("Re-enqueueing %i orphans whose creation had been deferred",
                  len(updates))
 
         for update in updates:
-          thread.enqueue(update)
+          self.thread.enqueue(update)
     except KeyError:
       pass
 
-    thread.propUpdateCount += 1
+    self.agent.databaseIds.commit()
 
-class StatisticUpdate(ModelUpdate):
-  def process(self, thread):
+    stats.propUpdateCount += 1
+
+class StatisticUpdate(ObjectUpdate):
+  def do_process(self, conn, stats):
     try:
       cls = self.getClass()
     except ReferenceException, e:
@@ -306,12 +302,10 @@
 
     statsCls = getattr(mint, "%sStats" % cls.__name__)
 
-    qmfObjectId = str(QmfObjectId.fromObject(self.object))
+    id = self.agent.databaseIds.get(self.object_id)
 
-    id = self.agent.databaseIds.get(qmfObjectId)
-
     if id is None:
-      thread.dropCount += 1
+      stats.dropCount += 1
       return
 
     timestamps = self.object.getTimestamps()
@@ -322,60 +316,38 @@
     if t < tnow - timedelta(seconds=30):
       log.debug("Update is %i seconds old; skipping it", (tnow - t).seconds)
 
-      thread.dropCount += 1
+      stats.dropCount += 1
+
       return
 
     try:
       attrs = self.processAttributes(self.object.getStatistics(), statsCls)
     except ReferenceException:
-      thread.dropCount += 1
+      stats.dropCount += 1
+
       return
 
     attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
     attrs["%s_id" % cls.sqlmeta.table] = id
 
-    cursor = thread.cursor()
+    cursor = conn.cursor()
 
     op = SqlInsert(statsCls, attrs)
     op.execute(cursor, attrs)
 
     log.debug("%s(%s) created", statsCls.__name__, id)
 
-    thread.statUpdateCount += 1
+    stats.statUpdateCount += 1
 
-class ExpireUpdate(ModelUpdate):
-  def __init__(self, model):
-    super(ExpireUpdate, self).__init__(model, None, None)
+class AgentDisconnectUpdate(Update):
+  def __init__(self, agent):
+    super(AgentDisconnectUpdate, self).__init__()
 
-  def process(self, thread):
-    cursor = thread.cursor()
-    attrs = thread.app.expireThread.attrs
-    total = 0
+    self.agent = agent
 
-    thread.conn.commit()
+  def do_process(self, conn, stats):
+    cursor = conn.cursor()
 
-    for op in thread.app.expireThread.ops:
-      log.debug("Running expire op %s", op)
-
-      count = op.execute(cursor, attrs)
-
-      thread.conn.commit()
-
-      log.debug("%i records expired", count)
-
-      total += count
-
-    log.debug("%i total records expired", total)
-
-    thread.expireUpdateCount += 1
-
-class AgentDisconnectUpdate(ModelUpdate):
-  def __init__(self, model, agent):
-    super(AgentDisconnectUpdate, self).__init__(model, agent, None)
-
-  def process(self, thread):
-    cursor = thread.cursor()
-
     args = dict()
 
     if self.agent:

Modified: mgmt/trunk/mint/python/mint/vacuum.py
===================================================================
--- mgmt/trunk/mint/python/mint/vacuum.py	2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/vacuum.py	2010-01-11 18:46:28 UTC (rev 3781)
@@ -4,32 +4,26 @@
 log = logging.getLogger("mint.vacuum")
 
 class VacuumThread(MintDaemonThread):
-  def __init__(self, app):
-    super(VacuumThread, self).__init__(app)
-
   def run(self):
     while True:
       if self.stopRequested:
         break
 
-      up = VacuumUpdate(self.app.model)
+      up = VacuumUpdate()
       self.app.updateThread.enqueue(up)
 
       sleep(60 * 10)
 
-class VacuumUpdate(ModelUpdate):
-  def __init__(self, model):
-    super(VacuumUpdate, self).__init__(model, None, None)
-
-  def process(self, thread):
+class VacuumUpdate(Update):
+  def do_process(self, conn, stats):
     log.info("Vacuuming")
 
-    thread.conn.commit()
+    conn.commit()
 
-    level = thread.conn.isolation_level
-    thread.conn.set_isolation_level(0)
+    level = conn.isolation_level
+    conn.set_isolation_level(0)
 
-    cursor = thread.cursor()
+    cursor = conn.cursor()
     cursor.execute("vacuum")
 
-    thread.conn.set_isolation_level(level)
+    conn.set_isolation_level(level)



More information about the rhmessaging-commits mailing list