[rhmessaging-commits] rhmessaging commits: r3769 - in mgmt/trunk: cumin/python/cumin/messaging and 1 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Jan 8 10:41:53 EST 2010


Author: justi9
Date: 2010-01-08 10:41:52 -0500 (Fri, 08 Jan 2010)
New Revision: 3769

Modified:
   mgmt/trunk/cumin/python/cumin/main.py
   mgmt/trunk/cumin/python/cumin/messaging/broker.py
   mgmt/trunk/cumin/python/cumin/messaging/model.py
   mgmt/trunk/cumin/python/cumin/model.py
   mgmt/trunk/mint/python/mint/cache.py
   mgmt/trunk/mint/python/mint/model.py
   mgmt/trunk/mint/python/mint/sql.py
   mgmt/trunk/mint/python/mint/update.py
Log:
 * Realign mint around agent objects; for now, this is faked somewhat,
   but this will be the new model in qmf2

 * Add a temporary hack to defer objectProp and objectStat calls; new
   1.3 broker behavior will make this unnecessary


Modified: mgmt/trunk/cumin/python/cumin/main.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/main.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/main.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -133,7 +133,7 @@
     def do_process(self, session):
         super(OverviewFrame, self).do_process(session)
 
-        count = len(self.app.model.mint.model.mintBrokersByUrl)
+        count = len(self.app.model.mint.model.qmfBrokers)
 
         if count == 0:
             self.mode.set(session, self.notice)

Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -84,13 +84,13 @@
             return "Status"
 
         def render_content(self, session, data):
-            agentId = data["qmf_agent_id"]
-            dt = self.app.model.mint.model.getLatestHeartbeat(agentId)
+            agent = self.app.model.mint.model.agents.get(data["qmf_agent_id"])
 
-            if dt is None:
-                return fmt_none()
-            else:
-                return fmt_datetime(dt)
+            if agent:
+                if agent.lastHeartbeat is None:
+                    return fmt_none()
+                else:
+                    return fmt_datetime(agent.lastHeartbeat)
 
     class ClusterColumn(SqlTableColumn):
         def render_title(self, session, data):

Modified: mgmt/trunk/cumin/python/cumin/messaging/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/model.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/model.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -26,7 +26,7 @@
 
         session_ids = set()
 
-        for broker in self.app.model.mint.model.mintBrokersByQmfBroker:
+        for broker in self.app.model.mint.model.qmfBrokers:
             session_ids.add(broker.getSessionId())
 
         for sess in conn.sessions:

Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/model.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -124,9 +124,9 @@
     def get_session_by_object(self, object):
         assert object
 
-        agent = self.mint.model.agentsById[object.qmfAgentId]
+        agent = self.mint.model.agents[object.qmfAgentId]
 
-        return agent.getBroker().getAmqpSession()
+        return agent.agent.getBroker().getAmqpSession()
 
     def get_negotiator_limits(self, negotiator):
         self.lock.acquire()
@@ -1125,9 +1125,6 @@
         prop = CuminProperty(self, "dataDir")
         prop.title = "Data Directory"
 
-        stat = self.StatusStat(self, "connection")
-        stat.category = "status"
-
     def init(self):
         super(CuminBroker, self).init()
 
@@ -1146,25 +1143,6 @@
         except:
             return broker.name
 
-    class StatusStat(CuminStat):
-        def value_text(self, broker):
-            connected = False
-            if broker:
-                try:
-                    mbroker = self.mint.model.mintBrokersById \
-                        [broker.qmfBrokerId]
-                    connected = mbroker.connected
-                except KeyError:
-                    pass
-
-            if connected:
-                return "Connected"
-            else:
-                return "Disconnected"
-
-        def rate_text(self, record):
-            return ""
-
 # XXX "do_" on this doesn't make sense
 def do_bind(session, queue_name, binding_info):
         for exchange in binding_info:
@@ -2437,7 +2415,7 @@
         pass
 
     def delete(self):
-        pass
+        self.model = None
 
     class UpdateThread(Thread):
         def __init__(self, store):
@@ -2475,6 +2453,8 @@
     def delete(self):
         del self.model.limits_by_negotiator[self.negotiator]
 
+        super(SubmissionJobStore, self).delete()
+
 class SubmissionJobStore(ObjectStore):
     def __init__(self, model, submission):
         super(SubmissionJobStore, self).__init__(model)
@@ -2492,3 +2472,5 @@
 
     def delete(self):
         del self.model.jobs_by_submission[self.submission]
+
+        super(SubmissionJobStore, self).delete()

Modified: mgmt/trunk/mint/python/mint/cache.py
===================================================================
--- mgmt/trunk/mint/python/mint/cache.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/cache.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,5 +1,3 @@
-from threading import RLock
-
 class MintCache(object):
   def __init__(self):
     self.__cache = dict()

Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/model.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,3 +1,4 @@
+from parsley.collectionsex import defaultdict
 from parsley.threadingex import Lifecycle
 from qpid.datatypes import UUID
 from qpid.util import URL
@@ -94,22 +95,6 @@
   brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
   unique = DatabaseIndex(broker, brokerGroup, unique=True)
 
-class MintBroker(object):
-  def __init__(self, url, qmfBroker):
-    self.url = url
-    self.qmfBroker = qmfBroker
-
-    self.qmfId = None
-    self.databaseId = None
-
-    self.objectDatabaseIds = MintCache() # database ids by qmf object id
-    self.orphans = dict() # updates by qmf object id
-
-    self.connected = False
-
-  def __repr__(self):
-    return "%s(%s)" % (self.__class__.__name__, self.url)
-
 class MintModel(Console, Lifecycle):
   staticInstance = None
 
@@ -121,33 +106,20 @@
 
     self.app = app
 
-    # Lookup tables used for recovering MintBroker objects, which have
-    # mint-specific accounting and wrap a qmf broker object
+    # qmfAgentId => MintAgent
+    self.agents = dict()
 
-    self.mintBrokersByQmfBroker = dict()
-    self.mintBrokersById = dict()
-    self.mintBrokersByUrl = dict()
+    # int seq => callable
+    self.outstandingMethodCalls = dict()
 
-    self.agentsById = dict()
-
-    # Agent heartbeats
-    # agentId => latest heartbeat timestamp
-
-    self.heartbeatsByAgentId = dict()
-
-    self.__lock = RLock()
-
-    self.dbConn = None
     self.qmfSession = None
+    self.qmfBrokers = list()
 
-    self.outstandingMethodCalls = dict()
+    self.lock = RLock()
 
-  def lock(self):
-    self.__lock.acquire()
+    self.deferredObjectPropCalls = defaultdict(list)
+    self.deferredObjectStatCalls = defaultdict(list)
 
-  def unlock(self):
-    self.__lock.release()
-
   def check(self):
     pass
 
@@ -157,11 +129,11 @@
     self.qmfSession = Session \
         (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
 
-    # clean up any transient objects that a previous instance may have
-    # left behind in the DB it's basically an unconstrained agent
+    # Clean up any transient objects that a previous instance may have
+    # left behind in the DB; it's basically an unconstrained agent
     # disconnect update, for any agent
 
-    up = update.AgentDisconnectUpdate(self, 0)
+    up = update.AgentDisconnectUpdate(self, None)
     self.app.updateThread.enqueue(up)
 
   def do_start(self):
@@ -171,159 +143,96 @@
       self.addBroker(uri)
 
   def do_stop(self):
-    for mbroker in self.mintBrokersByQmfBroker.values():
-      self.delBroker(mbroker)
+    for qbroker in self.qmfBrokers:
+      self.qmfSession.delBroker(qbroker)
 
   def addBroker(self, url):
     log.info("Adding qmf broker at %s", url)
 
-    self.lock()
+    self.lock.acquire()
     try:
       qbroker = self.qmfSession.addBroker(url)
-      mbroker = MintBroker(url, qbroker)
-
-      # Can't add the by-id mapping here, as the id is not set yet;
-      # instead we add it when brokerConnected is called
-
-      self.mintBrokersByQmfBroker[qbroker] = mbroker
-      self.mintBrokersByUrl[url] = mbroker
-
-      return mbroker
+      self.qmfBrokers.append(qbroker)
     finally:
-      self.unlock()
+      self.lock.release()
 
-  def delBroker(self, mbroker):
-    assert isinstance(mbroker, MintBroker)
-
-    log.info("Removing qmf broker at %s", mbroker.url)
-
-    self.lock()
-    try:
-      self.qmfSession.delBroker(mbroker.qmfBroker)
-
-      del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
-      del self.mintBrokersByUrl[mbroker.url]
-
-      if mbroker.qmfId:
-        del self.mintBrokersById[mbroker.qmfId]
-    finally:
-      self.unlock()
-
   def brokerConnected(self, qbroker):
-    """ Invoked when a connection is established to a broker """
-    self.lock()
-    try:
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
+    self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
 
-      assert mbroker.connected is False
-
-      mbroker.connected = True
-    finally:
-      self.unlock()
-
   def brokerInfo(self, qbroker):
-    self.lock()
-    try:
-      # We now have enough information to generate a proper agent ID;
-      # it would be much better to get the broker ID earlier, or to
-      # simply get a reasonable agent ID with less indirection
+    # Now we have a brokerId to use to generate fully qualified agent
+    # IDs
 
-      for agent in qbroker.getAgents():
-        agentId = str(QmfAgentId.fromAgent(agent))
+    for qagent in qbroker.getAgents():
+      self.newMintAgent(qagent)
 
-        log.debug("Adding agent %s", agentId)
-
-        self.agentsById[agentId] = agent
-
-      id = str(qbroker.getBrokerId())
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
-
-      mbroker.qmfId = id
-      self.mintBrokersById[id] = mbroker
-    finally:
-      self.unlock()
-
   def brokerDisconnected(self, qbroker):
-    """ Invoked when the connection to a broker is lost """
-    self.lock()
-    try:
-      mbroker = self.mintBrokersByQmfBroker[qbroker]
+    self.log.info \
+        ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
 
-      assert mbroker.connected is True
+  def newMintAgent(self, qagent):
+    agent = MintAgent(self, qagent)
 
-      mbroker.connected = False
-    finally:
-      self.unlock()
-
-  def isConnected(self):
-    self.lock()
+    self.lock.acquire()
     try:
-      for broker in self.mintBrokersByUrl.values():
-        if broker.connected:
-          return True
+      assert agent.id not in self.agents
+      self.agents[agent.id] = agent
     finally:
-      self.unlock()
+      self.lock.release()
 
-  def getMintBrokerByQmfBroker(self, qbroker):
-    self.lock()
-    try:
-      return self.mintBrokersByQmfBroker[qbroker]
-    finally:
-      self.unlock()
+    return agent
 
-  def newAgent(self, agent):
-    """ Invoked when a QMF agent is discovered. """
+  def getMintAgent(self, qagent):
+    id = str(QmfAgentId.fromAgent(qagent))
+    return self.agents[id]
 
-    log.info("Agent connected: %s", agent)
+  def newAgent(self, qagent):
+    log.info("Creating %s", qagent)
 
     # Some agents come down without a brokerId, meaning we can't
     # generate a fully qualified agent ID for them.  Those we handle
     # in brokerInfo.
 
-    if agent.getBroker().brokerId:
-      self.lock()
-      try:
-        agentId = str(QmfAgentId.fromAgent(agent))
-        
-        log.debug("Adding agent %s", agentId)
+    if qagent.getBroker().brokerId:
+      agent = self.newMintAgent(qagent)
 
-        self.agentsById[agentId] = agent
-      finally:
-        self.unlock()
+      # XXX This business is to handle an agent-vs-agent data ordering
+      # problem
 
-  def delAgent(self, agent):
-    """ Invoked when a QMF agent disconects. """
+      objectPropCalls = self.deferredObjectPropCalls[agent.id]
 
-    log.info("Agent disconnected: %s", agent)
+      for broker, object in objectPropCalls:
+        self.objectProps(broker, object)
 
-    agentId = str(QmfAgentId.fromAgent(agent))
+      objectStatCalls = self.deferredObjectStatCalls[agent.id]
 
-    self.lock()
+      for broker, object in objectStatCalls:
+        self.objectStats(broker, object)
+
+  def delAgent(self, qagent):
+    log.info("Deleting %s", qagent)
+
+    self.lock.acquire()
     try:
-      del self.agentsById[agentId]
+      agent = self.getMintAgent(qagent)
+      agent.model = None
+      del self.agents[agent.id]
     finally:
-      self.unlock()
+      self.lock.release()
  
-    up = update.AgentDisconnectUpdate(self, agentId)
+    up = update.AgentDisconnectUpdate(self, agent)
     self.app.updateThread.enqueue(up)
 
-  def heartbeat(self, agent, timestamp):
-    agentId = str(QmfAgentId.fromAgent(agent))
+  def heartbeat(self, qagent, timestamp):
     timestamp = timestamp / 1000000000
 
-    self.lock()
+    self.lock.acquire()
     try:
-      self.heartbeatsByAgentId[agentId] = datetime.fromtimestamp(timestamp)
+      agent = self.getMintAgent(qagent)
+      agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
     finally:
-      self.unlock()
+      self.lock.release()
 
-  def getLatestHeartbeat(self, agentId):
-    self.lock()
-    try:
-      return self.heartbeatsByAgentId.get(agentId)
-    finally:
-      self.unlock()
-
   def newPackage(self, name):
     """ Invoked when a QMF package is discovered. """
     pass
@@ -333,26 +242,46 @@
     used to obtain details about the class."""
     pass
 
-  def objectProps(self, broker, record):
+  def objectProps(self, broker, object):
     """ Invoked when an object is updated. """
+
     if not self.app.updateThread.isAlive():
       return
 
-    mbroker = self.getMintBrokerByQmfBroker(broker)
+    self.lock.acquire()
+    try:
+      id = str(QmfAgentId.fromObject(object))
 
-    up = update.PropertyUpdate(self, mbroker, record)
+      try:
+        agent = self.agents[id]
+      except KeyError:
+        self.deferredObjectPropCalls[id].append((broker, object))
+        return
+    finally:
+      self.lock.release()
 
+    up = update.PropertyUpdate(self, agent, object)
     self.app.updateThread.enqueue(up)
 
-  def objectStats(self, broker, record):
+  def objectStats(self, broker, object):
     """ Invoked when an object is updated. """
 
     if not self.app.updateThread.isAlive():
       return
 
-    mbroker = self.getMintBrokerByQmfBroker(broker)
-    up = update.StatisticUpdate(self, mbroker, record)
+    self.lock.acquire()
+    try:
+      id = str(QmfAgentId.fromObject(object))
 
+      try:
+        agent = self.agents[id]
+      except KeyError:
+        self.deferredObjectStatCalls[id].append((broker, object))
+        return
+    finally:
+      self.lock.release()
+
+    up = update.StatisticUpdate(self, agent, object)
     self.app.updateThread.enqueue(up)
 
   def event(self, broker, event):
@@ -363,10 +292,10 @@
     classKey = ClassKey(mintObject.qmfClassKey)
     objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
 
-    self.lock()
+    self.lock.acquire()
     try:
-      agent = self.agentsById[mintObject.qmfAgentId]
-      broker = agent.getBroker()
+      agent = self.agents[mintObject.qmfAgentId]
+      broker = agent.agent.getBroker()
 
       seq = self.qmfSession._sendMethodRequest \
           (broker, classKey, objectId, methodName, args)
@@ -376,15 +305,33 @@
 
       return seq
     finally:
-      self.unlock()
+      self.lock.release()
 
   def methodResponse(self, broker, seq, response):
     log.debug("Method response for request %i received from %s", seq, broker)
     log.debug("Response: %s", response)
 
-    self.lock()
+    self.lock.acquire()
     try:
       methodCallback = self.outstandingMethodCalls.pop(seq)
       methodCallback(response.text, response.outArgs)
     finally:
-      self.unlock()
+      self.lock.release()
+
+class MintAgent(object):
+  def __init__(self, model, agent):
+    self.model = model
+    self.agent = agent
+
+    self.id = str(QmfAgentId.fromAgent(agent))
+
+    self.lastHeartbeat = None
+
+    # qmfObjectId => int database id
+    self.databaseIds = MintCache()
+
+    # qmfObjectId => list of ModelUpdate objects
+    self.deferredUpdates = defaultdict(list)
+
+  def __repr__(self):
+    return "%s(%s)" % (self.__class__.__name__, self.id)

Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/sql.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -227,9 +227,9 @@
 
 
 class SqlAgentDisconnect(SqlOperation):
-  def __init__(self, useAgentId = True):
+  def __init__(self, agent):
     super(SqlAgentDisconnect, self).__init__("disconnect_agent")
-    self.useAgentId = useAgentId
+    self.agent = agent
 
   def generate(self):
     sql = ""
@@ -239,7 +239,7 @@
          set qmf_delete_time = now()
        where qmf_persistent = 'f'
          and qmf_delete_time is null""" % (dbStyle.pythonClassToDBTable(cls))
-      if self.useAgentId:
+      if self.agent:
         sql += """
          and qmf_agent_id = %(qmf_agent_id)s;
         """

Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py	2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/update.py	2010-01-08 15:41:52 UTC (rev 3769)
@@ -50,25 +50,21 @@
 
   def run(self):
     while True:
+      if self.stopRequested:
+        break
+
       try:
         update = self.updates.get(True, 1)
-
-        log.debug("Processing %s", update)
-
-        self.dequeueCount += 1
-
-        if self.stopRequested:
-          break
       except Empty:
-        if self.stopRequested:
-          break
-        else:
-          #log.debug("Queue is empty")
-          continue
+        continue
 
+      self.dequeueCount += 1
+
       self.processUpdate(update)
 
   def processUpdate(self, update):
+    log.debug("Processing %s", update)
+
     try:
       update.process(self)
 
@@ -77,30 +73,17 @@
         # commit only every "commitThreshold" updates, or whenever
         # the update queue is empty
 
-        self.commit()
+        update.commit()
+        self.conn.commit()
     except:
       log.exception("Update failed")
 
-      self.rollback()
+      update.rollback()
+      self.conn.rollback()
 
   def cursor(self):
     return self.conn.cursor()
 
-  def commit(self):
-    self.conn.commit()
-
-    for broker in self.app.model.mintBrokersByQmfBroker.values():
-      broker.objectDatabaseIds.commit()
-
-  def rollback(self):
-    try:
-      self.conn.rollback()
-    except:
-      log.exception("Rollback failed")
-
-    for broker in self.app.model.mintBrokersByQmfBroker.values():
-      broker.objectDatabaseIds.rollback()
-
 class ReferenceException(Exception):
     def __init__(self, sought):
         self.sought = sought
@@ -109,15 +92,19 @@
         return repr(self.sought)
 
 class ModelUpdate(object):
-  def __init__(self, model, broker, object):
+  def __init__(self, model, agent, object):
+    if agent:
+      from mint.model import MintAgent
+      assert isinstance(agent, MintAgent)
+
     self.model = model
-    self.broker = broker
+    self.agent = agent
     self.object = object
     self.priority = 0
 
   def __repr__(self):
     return "%s(%s, %s, %i)" % (self.__class__.__name__,
-                               str(self.broker),
+                               str(self.agent),
                                str(self.object),
                                self.priority)
 
@@ -143,6 +130,7 @@
       name = mint.schema.schemaReservedWordsMap.get(name, name)
 
       if key.type == 10:
+        # XXX don't want oid around much
         self.processReference(name, value, results)
         continue
 
@@ -180,9 +168,10 @@
 
     foreignKey = name + "_id"
 
-    id = self.broker.objectDatabaseIds.get(oid)
+    id = self.agent.databaseIds.get(str(QmfObjectId(oid.first, oid.second)))
 
     if id is None:
+      # XXX don't want oid around much
       raise ReferenceException(oid)
 
     results[foreignKey] = id
@@ -192,6 +181,18 @@
       t = datetime.fromtimestamp(tstamp / 1000000000)
       results[name] = t
 
+  def commit(self):
+    # XXX commit db here too
+
+    if self.agent:
+      self.agent.databaseIds.commit()
+
+  def rollback(self):
+    # XXX rollback db here too
+
+    if self.agent:
+      self.agent.databaseIds.rollback()
+
 class PropertyUpdate(ModelUpdate):
   def process(self, thread):
     try:
@@ -202,18 +203,14 @@
       thread.dropCount += 1
       return
 
-    oid = self.object.getObjectId()
+    qmfObjectId = str(QmfObjectId.fromObject(self.object))
 
     try:
       attrs = self.processAttributes(self.object.getProperties(), cls)
     except ReferenceException, e:
       log.info("Referenced object %r not found", e.sought)
 
-      try:
-        orphans = self.broker.orphans[oid]
-        orphans.append(self)
-      except KeyError:
-        self.broker.orphans[oid] = list((self,))
+      self.agent.deferredUpdates[qmfObjectId].append(self)
 
       thread.deferCount += 1
       return
@@ -226,12 +223,13 @@
     if delete != 0:
       self.processTimestamp("qmfDeleteTime", delete, attrs)
 
-      log.debug("%s(%s) marked deleted", cls.__name__, oid)
+      log.debug("%s(%s,%s) marked deleted",
+                cls.__name__, self.agent.id, qmfObjectId)
 
-    attrs["qmfAgentId"] = str(QmfAgentId.fromObject(self.object))
+    attrs["qmfAgentId"] = self.agent.id
     attrs["qmfClassKey"] = str(self.object.getClassKey())
-    attrs["qmfObjectId"] = str(QmfObjectId.fromObject(self.object))
-    attrs["qmfPersistent"] = oid.isDurable()
+    attrs["qmfObjectId"] = str(qmfObjectId)
+    attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
 
     cursor = thread.cursor()
 
@@ -241,7 +239,7 @@
     # 2. Object is in mint's db, but id is not yet cached
     # 3. Object is in mint's db, and id is cached
 
-    id = self.broker.objectDatabaseIds.get(oid)
+    id = self.agent.databaseIds.get(qmfObjectId)
 
     if id is None:
       # Case 1 or 2
@@ -273,7 +271,7 @@
 
         assert cursor.rowcount == 1
 
-      self.broker.objectDatabaseIds.set(oid, id)
+      self.agent.databaseIds.set(qmfObjectId, id)
     else:
       # Case 3
 
@@ -285,14 +283,14 @@
       #assert cursor.rowcount == 1
 
     try:
-      orphans = self.broker.orphans.pop(oid)
+      updates = self.agent.deferredUpdates.pop(qmfObjectId)
 
-      if orphans:
+      if updates:
         log.info("Re-enqueueing %i orphans whose creation had been deferred",
-                 len(orphans))
+                 len(updates))
 
-        for orphan in orphans:
-          thread.enqueue(orphan)
+        for update in updates:
+          thread.enqueue(update)
     except KeyError:
       pass
 
@@ -308,9 +306,10 @@
 
     statsCls = getattr(mint, "%sStats" % cls.__name__)
 
-    oid = self.object.getObjectId()
-    id = self.broker.objectDatabaseIds.get(oid)
+    qmfObjectId = str(QmfObjectId.fromObject(self.object))
 
+    id = self.agent.databaseIds.get(qmfObjectId)
+
     if id is None:
       thread.dropCount += 1
       return
@@ -332,7 +331,7 @@
       thread.dropCount += 1
       return
 
-    attrs["qmfUpdateTime"] = t > tnow and tnow or t
+    attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
     attrs["%s_id" % cls.sqlmeta.table] = id
 
     cursor = thread.cursor()
@@ -353,14 +352,14 @@
     attrs = thread.app.expireThread.attrs
     total = 0
 
-    thread.commit()
+    thread.conn.commit()
 
     for op in thread.app.expireThread.ops:
       log.debug("Running expire op %s", op)
 
       count = op.execute(cursor, attrs)
 
-      thread.commit()
+      thread.conn.commit()
 
       log.debug("%i records expired", count)
 
@@ -370,21 +369,21 @@
 
     thread.expireUpdateCount += 1
 
-
 class AgentDisconnectUpdate(ModelUpdate):
-  def __init__(self, model, agentId):
-    super(AgentDisconnectUpdate, self).__init__(model, None, None)
-    self.agentId = agentId
+  def __init__(self, model, agent):
+    super(AgentDisconnectUpdate, self).__init__(model, agent, None)
 
   def process(self, thread):
     cursor = thread.cursor()
-    useAgentId = self.agentId != 0
-    op = SqlAgentDisconnect(useAgentId)
-    if useAgentId:
-      op.execute(cursor, {"qmf_agent_id": self.agentId})
-    else:
-      op.execute(cursor)
 
+    args = dict()
+
+    if self.agent:
+      args["qmf_agent_id"] = self.agent.id
+
+    op = SqlAgentDisconnect(agent)
+    op.execute(cursor, args)
+
 class UpdateQueue(ConcurrentQueue):
   def __init__(self, maxsize=0, slotCount=1):
     self.slotCount = slotCount



More information about the rhmessaging-commits mailing list