[rhmessaging-commits] rhmessaging commits: r3769 - in mgmt/trunk: cumin/python/cumin/messaging and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Jan 8 10:41:53 EST 2010
Author: justi9
Date: 2010-01-08 10:41:52 -0500 (Fri, 08 Jan 2010)
New Revision: 3769
Modified:
mgmt/trunk/cumin/python/cumin/main.py
mgmt/trunk/cumin/python/cumin/messaging/broker.py
mgmt/trunk/cumin/python/cumin/messaging/model.py
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/cache.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/sql.py
mgmt/trunk/mint/python/mint/update.py
Log:
* Realign mint around agent objects; for now, this is faked somewhat,
but this will be the new model in qmf2
* Add a temporary hack to defer objectProp and objectStat calls; new
1.3 broker behavior will make this unnecessary
Modified: mgmt/trunk/cumin/python/cumin/main.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/main.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/main.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -133,7 +133,7 @@
def do_process(self, session):
super(OverviewFrame, self).do_process(session)
- count = len(self.app.model.mint.model.mintBrokersByUrl)
+ count = len(self.app.model.mint.model.qmfBrokers)
if count == 0:
self.mode.set(session, self.notice)
Modified: mgmt/trunk/cumin/python/cumin/messaging/broker.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/broker.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -84,13 +84,13 @@
return "Status"
def render_content(self, session, data):
- agentId = data["qmf_agent_id"]
- dt = self.app.model.mint.model.getLatestHeartbeat(agentId)
+ agent = self.app.model.mint.model.agents.get(data["qmf_agent_id"])
- if dt is None:
- return fmt_none()
- else:
- return fmt_datetime(dt)
+ if agent:
+ if agent.lastHeartbeat is None:
+ return fmt_none()
+ else:
+ return fmt_datetime(agent.lastHeartbeat)
class ClusterColumn(SqlTableColumn):
def render_title(self, session, data):
Modified: mgmt/trunk/cumin/python/cumin/messaging/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -26,7 +26,7 @@
session_ids = set()
- for broker in self.app.model.mint.model.mintBrokersByQmfBroker:
+ for broker in self.app.model.mint.model.qmfBrokers:
session_ids.add(broker.getSessionId())
for sess in conn.sessions:
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -124,9 +124,9 @@
def get_session_by_object(self, object):
assert object
- agent = self.mint.model.agentsById[object.qmfAgentId]
+ agent = self.mint.model.agents[object.qmfAgentId]
- return agent.getBroker().getAmqpSession()
+ return agent.agent.getBroker().getAmqpSession()
def get_negotiator_limits(self, negotiator):
self.lock.acquire()
@@ -1125,9 +1125,6 @@
prop = CuminProperty(self, "dataDir")
prop.title = "Data Directory"
- stat = self.StatusStat(self, "connection")
- stat.category = "status"
-
def init(self):
super(CuminBroker, self).init()
@@ -1146,25 +1143,6 @@
except:
return broker.name
- class StatusStat(CuminStat):
- def value_text(self, broker):
- connected = False
- if broker:
- try:
- mbroker = self.mint.model.mintBrokersById \
- [broker.qmfBrokerId]
- connected = mbroker.connected
- except KeyError:
- pass
-
- if connected:
- return "Connected"
- else:
- return "Disconnected"
-
- def rate_text(self, record):
- return ""
-
# XXX "do_" on this doesn't make sense
def do_bind(session, queue_name, binding_info):
for exchange in binding_info:
@@ -2437,7 +2415,7 @@
pass
def delete(self):
- pass
+ self.model = None
class UpdateThread(Thread):
def __init__(self, store):
@@ -2475,6 +2453,8 @@
def delete(self):
del self.model.limits_by_negotiator[self.negotiator]
+ super(SubmissionJobStore, self).delete()
+
class SubmissionJobStore(ObjectStore):
def __init__(self, model, submission):
super(SubmissionJobStore, self).__init__(model)
@@ -2492,3 +2472,5 @@
def delete(self):
del self.model.jobs_by_submission[self.submission]
+
+ super(SubmissionJobStore, self).delete()
Modified: mgmt/trunk/mint/python/mint/cache.py
===================================================================
--- mgmt/trunk/mint/python/mint/cache.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/cache.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,5 +1,3 @@
-from threading import RLock
-
class MintCache(object):
def __init__(self):
self.__cache = dict()
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -1,3 +1,4 @@
+from parsley.collectionsex import defaultdict
from parsley.threadingex import Lifecycle
from qpid.datatypes import UUID
from qpid.util import URL
@@ -94,22 +95,6 @@
brokerGroup = ForeignKey("BrokerGroup", notNull=True, cascade=True)
unique = DatabaseIndex(broker, brokerGroup, unique=True)
-class MintBroker(object):
- def __init__(self, url, qmfBroker):
- self.url = url
- self.qmfBroker = qmfBroker
-
- self.qmfId = None
- self.databaseId = None
-
- self.objectDatabaseIds = MintCache() # database ids by qmf object id
- self.orphans = dict() # updates by qmf object id
-
- self.connected = False
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.url)
-
class MintModel(Console, Lifecycle):
staticInstance = None
@@ -121,33 +106,20 @@
self.app = app
- # Lookup tables used for recovering MintBroker objects, which have
- # mint-specific accounting and wrap a qmf broker object
+ # qmfAgentId => MintAgent
+ self.agents = dict()
- self.mintBrokersByQmfBroker = dict()
- self.mintBrokersById = dict()
- self.mintBrokersByUrl = dict()
+ # int seq => callable
+ self.outstandingMethodCalls = dict()
- self.agentsById = dict()
-
- # Agent heartbeats
- # agentId => latest heartbeat timestamp
-
- self.heartbeatsByAgentId = dict()
-
- self.__lock = RLock()
-
- self.dbConn = None
self.qmfSession = None
+ self.qmfBrokers = list()
- self.outstandingMethodCalls = dict()
+ self.lock = RLock()
- def lock(self):
- self.__lock.acquire()
+ self.deferredObjectPropCalls = defaultdict(list)
+ self.deferredObjectStatCalls = defaultdict(list)
- def unlock(self):
- self.__lock.release()
-
def check(self):
pass
@@ -157,11 +129,11 @@
self.qmfSession = Session \
(self, manageConnections=True, rcvObjects=self.app.updateEnabled)
- # clean up any transient objects that a previous instance may have
- # left behind in the DB it's basically an unconstrained agent
+ # Clean up any transient objects that a previous instance may have
+ # left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(self, 0)
+ up = update.AgentDisconnectUpdate(self, None)
self.app.updateThread.enqueue(up)
def do_start(self):
@@ -171,159 +143,96 @@
self.addBroker(uri)
def do_stop(self):
- for mbroker in self.mintBrokersByQmfBroker.values():
- self.delBroker(mbroker)
+ for qbroker in self.qmfBrokers:
+ self.qmfSession.delBroker(qbroker)
def addBroker(self, url):
log.info("Adding qmf broker at %s", url)
- self.lock()
+ self.lock.acquire()
try:
qbroker = self.qmfSession.addBroker(url)
- mbroker = MintBroker(url, qbroker)
-
- # Can't add the by-id mapping here, as the id is not set yet;
- # instead we add it when brokerConnected is called
-
- self.mintBrokersByQmfBroker[qbroker] = mbroker
- self.mintBrokersByUrl[url] = mbroker
-
- return mbroker
+ self.qmfBrokers.append(qbroker)
finally:
- self.unlock()
+ self.lock.release()
- def delBroker(self, mbroker):
- assert isinstance(mbroker, MintBroker)
-
- log.info("Removing qmf broker at %s", mbroker.url)
-
- self.lock()
- try:
- self.qmfSession.delBroker(mbroker.qmfBroker)
-
- del self.mintBrokersByQmfBroker[mbroker.qmfBroker]
- del self.mintBrokersByUrl[mbroker.url]
-
- if mbroker.qmfId:
- del self.mintBrokersById[mbroker.qmfId]
- finally:
- self.unlock()
-
def brokerConnected(self, qbroker):
- """ Invoked when a connection is established to a broker """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
+ self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
- assert mbroker.connected is False
-
- mbroker.connected = True
- finally:
- self.unlock()
-
def brokerInfo(self, qbroker):
- self.lock()
- try:
- # We now have enough information to generate a proper agent ID;
- # it would be much better to get the broker ID earlier, or to
- # simply get a reasonable agent ID with less indirection
+ # Now we have a brokerId to use to generate fully qualified agent
+ # IDs
- for agent in qbroker.getAgents():
- agentId = str(QmfAgentId.fromAgent(agent))
+ for qagent in qbroker.getAgents():
+ self.newMintAgent(qagent)
- log.debug("Adding agent %s", agentId)
-
- self.agentsById[agentId] = agent
-
- id = str(qbroker.getBrokerId())
- mbroker = self.mintBrokersByQmfBroker[qbroker]
-
- mbroker.qmfId = id
- self.mintBrokersById[id] = mbroker
- finally:
- self.unlock()
-
def brokerDisconnected(self, qbroker):
- """ Invoked when the connection to a broker is lost """
- self.lock()
- try:
- mbroker = self.mintBrokersByQmfBroker[qbroker]
+ self.log.info \
+ ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
- assert mbroker.connected is True
+ def newMintAgent(self, qagent):
+ agent = MintAgent(self, qagent)
- mbroker.connected = False
- finally:
- self.unlock()
-
- def isConnected(self):
- self.lock()
+ self.lock.acquire()
try:
- for broker in self.mintBrokersByUrl.values():
- if broker.connected:
- return True
+ assert agent.id not in self.agents
+ self.agents[agent.id] = agent
finally:
- self.unlock()
+ self.lock.release()
- def getMintBrokerByQmfBroker(self, qbroker):
- self.lock()
- try:
- return self.mintBrokersByQmfBroker[qbroker]
- finally:
- self.unlock()
+ return agent
- def newAgent(self, agent):
- """ Invoked when a QMF agent is discovered. """
+ def getMintAgent(self, qagent):
+ id = str(QmfAgentId.fromAgent(qagent))
+ return self.agents[id]
- log.info("Agent connected: %s", agent)
+ def newAgent(self, qagent):
+ log.info("Creating %s", qagent)
# Some agents come down without a brokerId, meaning we can't
# generate a fully qualified agent ID for them. Those we handle
# in brokerInfo.
- if agent.getBroker().brokerId:
- self.lock()
- try:
- agentId = str(QmfAgentId.fromAgent(agent))
-
- log.debug("Adding agent %s", agentId)
+ if qagent.getBroker().brokerId:
+ agent = self.newMintAgent(qagent)
- self.agentsById[agentId] = agent
- finally:
- self.unlock()
+ # XXX This business is to handle an agent-vs-agent data ordering
+ # problem
- def delAgent(self, agent):
- """ Invoked when a QMF agent disconects. """
+ objectPropCalls = self.deferredObjectPropCalls[agent.id]
- log.info("Agent disconnected: %s", agent)
+ for broker, object in objectPropCalls:
+ self.objectProps(broker, object)
- agentId = str(QmfAgentId.fromAgent(agent))
+ objectStatCalls = self.deferredObjectStatCalls[agent.id]
- self.lock()
+ for broker, object in objectStatCalls:
+ self.objectStats(broker, object)
+
+ def delAgent(self, qagent):
+ log.info("Deleting %s", qagent)
+
+ self.lock.acquire()
try:
- del self.agentsById[agentId]
+ agent = self.getMintAgent(qagent)
+ agent.model = None
+ del self.agents[agent.id]
finally:
- self.unlock()
+ self.lock.release()
- up = update.AgentDisconnectUpdate(self, agentId)
+ up = update.AgentDisconnectUpdate(self, agent)
self.app.updateThread.enqueue(up)
- def heartbeat(self, agent, timestamp):
- agentId = str(QmfAgentId.fromAgent(agent))
+ def heartbeat(self, qagent, timestamp):
timestamp = timestamp / 1000000000
- self.lock()
+ self.lock.acquire()
try:
- self.heartbeatsByAgentId[agentId] = datetime.fromtimestamp(timestamp)
+ agent = self.getMintAgent(qagent)
+ agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
finally:
- self.unlock()
+ self.lock.release()
- def getLatestHeartbeat(self, agentId):
- self.lock()
- try:
- return self.heartbeatsByAgentId.get(agentId)
- finally:
- self.unlock()
-
def newPackage(self, name):
""" Invoked when a QMF package is discovered. """
pass
@@ -333,26 +242,46 @@
used to obtain details about the class."""
pass
- def objectProps(self, broker, record):
+ def objectProps(self, broker, object):
""" Invoked when an object is updated. """
+
if not self.app.updateThread.isAlive():
return
- mbroker = self.getMintBrokerByQmfBroker(broker)
+ self.lock.acquire()
+ try:
+ id = str(QmfAgentId.fromObject(object))
- up = update.PropertyUpdate(self, mbroker, record)
+ try:
+ agent = self.agents[id]
+ except KeyError:
+ self.deferredObjectPropCalls[id].append((broker, object))
+ return
+ finally:
+ self.lock.release()
+ up = update.PropertyUpdate(self, agent, object)
self.app.updateThread.enqueue(up)
- def objectStats(self, broker, record):
+ def objectStats(self, broker, object):
""" Invoked when an object is updated. """
if not self.app.updateThread.isAlive():
return
- mbroker = self.getMintBrokerByQmfBroker(broker)
- up = update.StatisticUpdate(self, mbroker, record)
+ self.lock.acquire()
+ try:
+ id = str(QmfAgentId.fromObject(object))
+ try:
+ agent = self.agents[id]
+ except KeyError:
+ self.deferredObjectStatCalls[id].append((broker, object))
+ return
+ finally:
+ self.lock.release()
+
+ up = update.StatisticUpdate(self, agent, object)
self.app.updateThread.enqueue(up)
def event(self, broker, event):
@@ -363,10 +292,10 @@
classKey = ClassKey(mintObject.qmfClassKey)
objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
- self.lock()
+ self.lock.acquire()
try:
- agent = self.agentsById[mintObject.qmfAgentId]
- broker = agent.getBroker()
+ agent = self.agents[mintObject.qmfAgentId]
+ broker = agent.agent.getBroker()
seq = self.qmfSession._sendMethodRequest \
(broker, classKey, objectId, methodName, args)
@@ -376,15 +305,33 @@
return seq
finally:
- self.unlock()
+ self.lock.release()
def methodResponse(self, broker, seq, response):
log.debug("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
- self.lock()
+ self.lock.acquire()
try:
methodCallback = self.outstandingMethodCalls.pop(seq)
methodCallback(response.text, response.outArgs)
finally:
- self.unlock()
+ self.lock.release()
+
+class MintAgent(object):
+ def __init__(self, model, agent):
+ self.model = model
+ self.agent = agent
+
+ self.id = str(QmfAgentId.fromAgent(agent))
+
+ self.lastHeartbeat = None
+
+ # qmfObjectId => int database id
+ self.databaseIds = MintCache()
+
+ # qmfObjectId => list of ModelUpdate objects
+ self.deferredUpdates = defaultdict(list)
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.id)
Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/sql.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -227,9 +227,9 @@
class SqlAgentDisconnect(SqlOperation):
- def __init__(self, useAgentId = True):
+ def __init__(self, agent):
super(SqlAgentDisconnect, self).__init__("disconnect_agent")
- self.useAgentId = useAgentId
+ self.agent = agent
def generate(self):
sql = ""
@@ -239,7 +239,7 @@
set qmf_delete_time = now()
where qmf_persistent = 'f'
and qmf_delete_time is null""" % (dbStyle.pythonClassToDBTable(cls))
- if self.useAgentId:
+ if self.agent:
sql += """
and qmf_agent_id = %(qmf_agent_id)s;
"""
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-08 15:41:13 UTC (rev 3768)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-08 15:41:52 UTC (rev 3769)
@@ -50,25 +50,21 @@
def run(self):
while True:
+ if self.stopRequested:
+ break
+
try:
update = self.updates.get(True, 1)
-
- log.debug("Processing %s", update)
-
- self.dequeueCount += 1
-
- if self.stopRequested:
- break
except Empty:
- if self.stopRequested:
- break
- else:
- #log.debug("Queue is empty")
- continue
+ continue
+ self.dequeueCount += 1
+
self.processUpdate(update)
def processUpdate(self, update):
+ log.debug("Processing %s", update)
+
try:
update.process(self)
@@ -77,30 +73,17 @@
# commit only every "commitThreshold" updates, or whenever
# the update queue is empty
- self.commit()
+ update.commit()
+ self.conn.commit()
except:
log.exception("Update failed")
- self.rollback()
+ update.rollback()
+ self.conn.rollback()
def cursor(self):
return self.conn.cursor()
- def commit(self):
- self.conn.commit()
-
- for broker in self.app.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.commit()
-
- def rollback(self):
- try:
- self.conn.rollback()
- except:
- log.exception("Rollback failed")
-
- for broker in self.app.model.mintBrokersByQmfBroker.values():
- broker.objectDatabaseIds.rollback()
-
class ReferenceException(Exception):
def __init__(self, sought):
self.sought = sought
@@ -109,15 +92,19 @@
return repr(self.sought)
class ModelUpdate(object):
- def __init__(self, model, broker, object):
+ def __init__(self, model, agent, object):
+ if agent:
+ from mint.model import MintAgent
+ assert isinstance(agent, MintAgent)
+
self.model = model
- self.broker = broker
+ self.agent = agent
self.object = object
self.priority = 0
def __repr__(self):
return "%s(%s, %s, %i)" % (self.__class__.__name__,
- str(self.broker),
+ str(self.agent),
str(self.object),
self.priority)
@@ -143,6 +130,7 @@
name = mint.schema.schemaReservedWordsMap.get(name, name)
if key.type == 10:
+ # XXX don't want oid around much
self.processReference(name, value, results)
continue
@@ -180,9 +168,10 @@
foreignKey = name + "_id"
- id = self.broker.objectDatabaseIds.get(oid)
+ id = self.agent.databaseIds.get(str(QmfObjectId(oid.first, oid.second)))
if id is None:
+ # XXX don't want oid around much
raise ReferenceException(oid)
results[foreignKey] = id
@@ -192,6 +181,18 @@
t = datetime.fromtimestamp(tstamp / 1000000000)
results[name] = t
+ def commit(self):
+ # XXX commit db here too
+
+ if self.agent:
+ self.agent.databaseIds.commit()
+
+ def rollback(self):
+ # XXX rollback db here too
+
+ if self.agent:
+ self.agent.databaseIds.rollback()
+
class PropertyUpdate(ModelUpdate):
def process(self, thread):
try:
@@ -202,18 +203,14 @@
thread.dropCount += 1
return
- oid = self.object.getObjectId()
+ qmfObjectId = str(QmfObjectId.fromObject(self.object))
try:
attrs = self.processAttributes(self.object.getProperties(), cls)
except ReferenceException, e:
log.info("Referenced object %r not found", e.sought)
- try:
- orphans = self.broker.orphans[oid]
- orphans.append(self)
- except KeyError:
- self.broker.orphans[oid] = list((self,))
+ self.agent.deferredUpdates[qmfObjectId].append(self)
thread.deferCount += 1
return
@@ -226,12 +223,13 @@
if delete != 0:
self.processTimestamp("qmfDeleteTime", delete, attrs)
- log.debug("%s(%s) marked deleted", cls.__name__, oid)
+ log.debug("%s(%s,%s) marked deleted",
+ cls.__name__, self.agent.id, qmfObjectId)
- attrs["qmfAgentId"] = str(QmfAgentId.fromObject(self.object))
+ attrs["qmfAgentId"] = self.agent.id
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(QmfObjectId.fromObject(self.object))
- attrs["qmfPersistent"] = oid.isDurable()
+ attrs["qmfObjectId"] = str(qmfObjectId)
+ attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
cursor = thread.cursor()
@@ -241,7 +239,7 @@
# 2. Object is in mint's db, but id is not yet cached
# 3. Object is in mint's db, and id is cached
- id = self.broker.objectDatabaseIds.get(oid)
+ id = self.agent.databaseIds.get(qmfObjectId)
if id is None:
# Case 1 or 2
@@ -273,7 +271,7 @@
assert cursor.rowcount == 1
- self.broker.objectDatabaseIds.set(oid, id)
+ self.agent.databaseIds.set(qmfObjectId, id)
else:
# Case 3
@@ -285,14 +283,14 @@
#assert cursor.rowcount == 1
try:
- orphans = self.broker.orphans.pop(oid)
+ updates = self.agent.deferredUpdates.pop(qmfObjectId)
- if orphans:
+ if updates:
log.info("Re-enqueueing %i orphans whose creation had been deferred",
- len(orphans))
+ len(updates))
- for orphan in orphans:
- thread.enqueue(orphan)
+ for update in updates:
+ thread.enqueue(update)
except KeyError:
pass
@@ -308,9 +306,10 @@
statsCls = getattr(mint, "%sStats" % cls.__name__)
- oid = self.object.getObjectId()
- id = self.broker.objectDatabaseIds.get(oid)
+ qmfObjectId = str(QmfObjectId.fromObject(self.object))
+ id = self.agent.databaseIds.get(qmfObjectId)
+
if id is None:
thread.dropCount += 1
return
@@ -332,7 +331,7 @@
thread.dropCount += 1
return
- attrs["qmfUpdateTime"] = t > tnow and tnow or t
+ attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
attrs["%s_id" % cls.sqlmeta.table] = id
cursor = thread.cursor()
@@ -353,14 +352,14 @@
attrs = thread.app.expireThread.attrs
total = 0
- thread.commit()
+ thread.conn.commit()
for op in thread.app.expireThread.ops:
log.debug("Running expire op %s", op)
count = op.execute(cursor, attrs)
- thread.commit()
+ thread.conn.commit()
log.debug("%i records expired", count)
@@ -370,21 +369,21 @@
thread.expireUpdateCount += 1
-
class AgentDisconnectUpdate(ModelUpdate):
- def __init__(self, model, agentId):
- super(AgentDisconnectUpdate, self).__init__(model, None, None)
- self.agentId = agentId
+ def __init__(self, model, agent):
+ super(AgentDisconnectUpdate, self).__init__(model, agent, None)
def process(self, thread):
cursor = thread.cursor()
- useAgentId = self.agentId != 0
- op = SqlAgentDisconnect(useAgentId)
- if useAgentId:
- op.execute(cursor, {"qmf_agent_id": self.agentId})
- else:
- op.execute(cursor)
+ args = dict()
+
+ if self.agent:
+ args["qmf_agent_id"] = self.agent.id
+
+ op = SqlAgentDisconnect(agent)
+ op.execute(cursor, args)
+
class UpdateQueue(ConcurrentQueue):
def __init__(self, maxsize=0, slotCount=1):
self.slotCount = slotCount
More information about the rhmessaging-commits
mailing list