rhmessaging commits: r3784 - in mgmt/trunk: cumin/python/cumin/messaging and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-11 15:39:58 -0500 (Mon, 11 Jan 2010)
New Revision: 3784
Modified:
mgmt/trunk/cumin/python/cumin/main.py
mgmt/trunk/cumin/python/cumin/messaging/model.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/tools.py
Log:
More pythonic naming
Modified: mgmt/trunk/cumin/python/cumin/main.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/main.py 2010-01-11 20:07:58 UTC (rev 3783)
+++ mgmt/trunk/cumin/python/cumin/main.py 2010-01-11 20:39:58 UTC (rev 3784)
@@ -133,7 +133,7 @@
def do_process(self, session):
super(OverviewFrame, self).do_process(session)
- count = len(self.app.model.mint.model.qmfBrokers)
+ count = len(self.app.model.mint.model.qmf_brokers)
if count == 0:
self.mode.set(session, self.notice)
Modified: mgmt/trunk/cumin/python/cumin/messaging/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-11 20:07:58 UTC (rev 3783)
+++ mgmt/trunk/cumin/python/cumin/messaging/model.py 2010-01-11 20:39:58 UTC (rev 3784)
@@ -26,7 +26,7 @@
session_ids = set()
- for broker in self.app.model.mint.model.qmfBrokers:
+ for broker in self.app.model.mint.model.qmf_brokers:
session_ids.add(broker.getSessionId())
for sess in conn.sessions:
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-11 20:07:58 UTC (rev 3783)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-11 20:39:58 UTC (rev 3784)
@@ -27,10 +27,10 @@
self.agents = dict()
# int seq => callable
- self.outstandingMethodCalls = dict()
+ self.outstanding_method_calls = dict()
- self.qmfSession = None
- self.qmfBrokers = list()
+ self.qmf_session = None
+ self.qmf_brokers = list()
self.lock = RLock()
@@ -38,11 +38,11 @@
pass
def do_init(self):
- assert self.qmfSession is None
+ assert self.qmf_session is None
- self.qmfSession = Session(MintConsole(self),
- manageConnections=True,
- rcvObjects=self.app.updateEnabled)
+ self.qmf_session = Session(MintConsole(self),
+ manageConnections=True,
+ rcvObjects=self.app.updateEnabled)
def do_start(self):
# Clean up any transient objects that a previous instance may have
@@ -55,19 +55,19 @@
uris = [x.strip() for x in self.app.config.qmf.split(",")]
for uri in uris:
- self.addBroker(uri)
+ self.add_broker(uri)
def do_stop(self):
- for qbroker in self.qmfBrokers:
- self.qmfSession.delBroker(qbroker)
+ for qbroker in self.qmf_brokers:
+ self.qmf_session.delBroker(qbroker)
- def addBroker(self, url):
+ def add_broker(self, url):
log.info("Adding qmf broker at %s", url)
self.lock.acquire()
try:
- qbroker = self.qmfSession.addBroker(url)
- self.qmfBrokers.append(qbroker)
+ qbroker = self.qmf_session.addBroker(url)
+ self.qmf_brokers.append(qbroker)
finally:
self.lock.release()
@@ -105,11 +105,11 @@
try:
broker = self.agent.getBroker()
- seq = self.model.qmfSession._sendMethodRequest \
+ seq = self.model.qmf_session._sendMethodRequest \
(broker, classKey, objectId, methodName, args)
if seq is not None:
- self.model.outstandingMethodCalls[seq] = callback
+ self.model.outstanding_method_calls[seq] = callback
return seq
finally:
@@ -118,7 +118,7 @@
def delete(self):
self.model.lock.acquire()
try:
- del self.model.agents[agent.id]
+ del self.model.agents[self.id]
finally:
self.model.lock.release()
@@ -131,8 +131,8 @@
def __init__(self, model):
self.model = model
- self.deferredObjectPropCalls = defaultdict(list)
- self.deferredObjectStatCalls = defaultdict(list)
+ self.deferred_object_prop_calls = defaultdict(list)
+ self.deferred_object_stat_calls = defaultdict(list)
def brokerConnected(self, qbroker):
log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
@@ -160,12 +160,12 @@
# XXX This business is to handle an agent-vs-agent-data ordering
# problem
- objectPropCalls = self.deferredObjectPropCalls[agent.id]
+ objectPropCalls = self.deferred_object_prop_calls[agent.id]
for broker, object in objectPropCalls:
self.objectProps(broker, object)
- objectStatCalls = self.deferredObjectStatCalls[agent.id]
+ objectStatCalls = self.deferred_object_stat_calls[agent.id]
for broker, object in objectStatCalls:
self.objectStats(broker, object)
@@ -177,7 +177,7 @@
agent.delete()
up = AgentDisconnectUpdate(agent)
- self.app.updateThread.enqueue(up)
+ self.model.app.updateThread.enqueue(up)
def heartbeat(self, qagent, timestamp):
timestamp = timestamp / 1000000000
@@ -202,7 +202,7 @@
try:
agent = self.model.agents[id]
except KeyError:
- self.deferredObjectPropCalls[id].append((broker, object))
+ self.deferred_object_prop_calls[id].append((broker, object))
return
finally:
self.model.lock.release()
@@ -223,7 +223,7 @@
try:
agent = self.model.agents[id]
except KeyError:
- self.deferredObjectStatCalls[id].append((broker, object))
+ self.deferred_object_stat_calls[id].append((broker, object))
return
finally:
self.model.lock.release()
@@ -241,7 +241,7 @@
self.model.lock.acquire()
try:
- methodCallback = self.model.outstandingMethodCalls.pop(seq)
- methodCallback(response.text, response.outArgs)
+ callback = self.model.outstanding_method_calls.pop(seq)
+ callback(response.text, response.outArgs)
finally:
self.model.lock.release()
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-11 20:07:58 UTC (rev 3783)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-11 20:39:58 UTC (rev 3784)
@@ -395,7 +395,7 @@
try:
for arg in args[1:]:
- app.model.addBroker(arg)
+ app.model.add_broker(arg)
while True:
sleep(2)
@@ -415,7 +415,7 @@
try:
for arg in args[1:]:
- app.model.addBroker(arg)
+ app.model.add_broker(arg)
while True:
sleep(2)
@@ -447,7 +447,7 @@
try:
for arg in args[1:]:
try:
- app.model.addBroker(arg)
+ app.model.add_broker(arg)
except socket.error, e:
print "Warning: Failed connecting to broker at '%s'" % arg
14 years, 11 months
rhmessaging commits: r3783 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-11 15:07:58 -0500 (Mon, 11 Jan 2010)
New Revision: 3783
Modified:
mgmt/trunk/mint/python/mint/model.py
Log:
Isolate console callbacks from the model object
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-11 19:13:53 UTC (rev 3782)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-11 20:07:58 UTC (rev 3783)
@@ -2,10 +2,10 @@
from parsley.threadingex import Lifecycle
from sqlobject import *
-from mint import update
from mint.cache import MintCache
from mint.schema import *
from mint.schemalocal import *
+from mint.update import *
from mint.util import *
import mint.schema
@@ -14,7 +14,7 @@
log = logging.getLogger("mint.model")
-class MintModel(Console, Lifecycle):
+class MintModel(Lifecycle):
def __init__(self, app):
self.log = log
@@ -34,24 +34,22 @@
self.lock = RLock()
- self.deferredObjectPropCalls = defaultdict(list)
- self.deferredObjectStatCalls = defaultdict(list)
-
def check(self):
pass
def do_init(self):
assert self.qmfSession is None
- self.qmfSession = Session \
- (self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+ self.qmfSession = Session(MintConsole(self),
+ manageConnections=True,
+ rcvObjects=self.app.updateEnabled)
def do_start(self):
# Clean up any transient objects that a previous instance may have
# left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(None)
+ up = AgentDisconnectUpdate(None)
self.app.updateThread.enqueue(up)
uris = [x.strip() for x in self.app.config.qmf.split(",")]
@@ -73,36 +71,82 @@
finally:
self.lock.release()
- def brokerConnected(self, qbroker):
- self.log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+ def getMintAgent(self, qagent):
+ id = str(QmfAgentId.fromAgent(qagent))
+ return self.agents[id]
- def brokerInfo(self, qbroker):
- # Now we have a brokerId to use to generate fully qualified agent
- # IDs
+class MintAgent(object):
+ def __init__(self, model, agent):
+ self.model = model
+ self.agent = agent
- for qagent in qbroker.getAgents():
- self.newMintAgent(qagent)
+ self.id = str(QmfAgentId.fromAgent(agent))
- def brokerDisconnected(self, qbroker):
- self.log.info \
- ("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+ self.lastHeartbeat = None
- def newMintAgent(self, qagent):
- agent = MintAgent(self, qagent)
+ # qmfObjectId => int database id
+ self.databaseIds = MintCache()
- self.lock.acquire()
+ # qmfObjectId => list of ModelUpdate objects
+ self.deferredUpdates = defaultdict(list)
+
+ self.model.lock.acquire()
try:
- assert agent.id not in self.agents
- self.agents[agent.id] = agent
+ assert self.id not in self.model.agents
+ self.model.agents[self.id] = self
finally:
- self.lock.release()
+ self.model.lock.release()
- return agent
+ def callMethod(self, mintObject, methodName, callback, args):
+ classKey = ClassKey(mintObject.qmfClassKey)
+ objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
- def getMintAgent(self, qagent):
- id = str(QmfAgentId.fromAgent(qagent))
- return self.agents[id]
+ self.model.lock.acquire()
+ try:
+ broker = self.agent.getBroker()
+ seq = self.model.qmfSession._sendMethodRequest \
+ (broker, classKey, objectId, methodName, args)
+
+ if seq is not None:
+ self.model.outstandingMethodCalls[seq] = callback
+
+ return seq
+ finally:
+ self.model.lock.release()
+
+ def delete(self):
+ self.model.lock.acquire()
+ try:
+ del self.model.agents[agent.id]
+ finally:
+ self.model.lock.release()
+
+ self.model = None
+
+ def __repr__(self):
+ return "%s(%s)" % (self.__class__.__name__, self.id)
+
+class MintConsole(Console):
+ def __init__(self, model):
+ self.model = model
+
+ self.deferredObjectPropCalls = defaultdict(list)
+ self.deferredObjectStatCalls = defaultdict(list)
+
+ def brokerConnected(self, qbroker):
+ log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+
+ def brokerInfo(self, qbroker):
+ # Now we have a brokerId to use to generate fully qualified agent
+ # IDs
+
+ for qagent in qbroker.getAgents():
+ MintAgent(self.model, qagent)
+
+ def brokerDisconnected(self, qbroker):
+ log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+
def newAgent(self, qagent):
log.info("Creating %s", qagent)
@@ -111,9 +155,9 @@
# in brokerInfo.
if qagent.getBroker().brokerId:
- agent = self.newMintAgent(qagent)
+ agent = MintAgent(self.model, qagent)
- # XXX This business is to handle an agent-vs-agent data ordering
+ # XXX This business is to handle an agent-vs-agent-data ordering
# problem
objectPropCalls = self.deferredObjectPropCalls[agent.id]
@@ -129,125 +173,75 @@
def delAgent(self, qagent):
log.info("Deleting %s", qagent)
- self.lock.acquire()
- try:
- agent = self.getMintAgent(qagent)
- agent.model = None
- del self.agents[agent.id]
- finally:
- self.lock.release()
+ agent = self.model.getMintAgent(qagent)
+ agent.delete()
- up = update.AgentDisconnectUpdate(agent)
+ up = AgentDisconnectUpdate(agent)
self.app.updateThread.enqueue(up)
def heartbeat(self, qagent, timestamp):
timestamp = timestamp / 1000000000
- self.lock.acquire()
- try:
- agent = self.getMintAgent(qagent)
- agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
- finally:
- self.lock.release()
+ agent = self.model.getMintAgent(qagent)
+ agent.lastHeartbeat = datetime.fromtimestamp(timestamp)
def newPackage(self, name):
- """ Invoked when a QMF package is discovered. """
- pass
+ log.info("New package %s", name)
def newClass(self, kind, classKey):
- """ Invoked when a new class is discovered. Session.getSchema can be
- used to obtain details about the class."""
- pass
+ log.info("New class %s", classKey)
def objectProps(self, broker, object):
- """ Invoked when an object is updated. """
-
- if not self.app.updateThread.isAlive():
+ if not self.model.app.updateThread.isAlive():
return
- self.lock.acquire()
+ self.model.lock.acquire()
try:
id = str(QmfAgentId.fromObject(object))
try:
- agent = self.agents[id]
+ agent = self.model.agents[id]
except KeyError:
self.deferredObjectPropCalls[id].append((broker, object))
return
finally:
- self.lock.release()
+ self.model.lock.release()
- up = update.PropertyUpdate(agent, object)
- self.app.updateThread.enqueue(up)
+ up = PropertyUpdate(agent, object)
+ self.model.app.updateThread.enqueue(up)
def objectStats(self, broker, object):
""" Invoked when an object is updated. """
- if not self.app.updateThread.isAlive():
+ if not self.model.app.updateThread.isAlive():
return
- self.lock.acquire()
+ self.model.lock.acquire()
try:
id = str(QmfAgentId.fromObject(object))
try:
- agent = self.agents[id]
+ agent = self.model.agents[id]
except KeyError:
self.deferredObjectStatCalls[id].append((broker, object))
return
finally:
- self.lock.release()
+ self.model.lock.release()
- up = update.StatisticUpdate(agent, object)
- self.app.updateThread.enqueue(up)
+ up = StatisticUpdate(agent, object)
+ self.model.app.updateThread.enqueue(up)
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
def methodResponse(self, broker, seq, response):
- log.debug("Method response for request %i received from %s", seq, broker)
+ log.info("Method response for request %i received from %s", seq, broker)
log.debug("Response: %s", response)
- self.lock.acquire()
+ self.model.lock.acquire()
try:
- methodCallback = self.outstandingMethodCalls.pop(seq)
+ methodCallback = self.model.outstandingMethodCalls.pop(seq)
methodCallback(response.text, response.outArgs)
finally:
- self.lock.release()
-
-class MintAgent(object):
- def __init__(self, model, agent):
- self.model = model
- self.agent = agent
-
- self.id = str(QmfAgentId.fromAgent(agent))
-
- self.lastHeartbeat = None
-
- # qmfObjectId => int database id
- self.databaseIds = MintCache()
-
- # qmfObjectId => list of ModelUpdate objects
- self.deferredUpdates = defaultdict(list)
-
- def callMethod(self, mintObject, methodName, callback, args):
- classKey = ClassKey(mintObject.qmfClassKey)
- objectId = QmfObjectId.fromString(mintObject.qmfObjectId).toObjectId()
-
- self.model.lock.acquire()
- try:
- broker = self.agent.getBroker()
-
- seq = self.model.qmfSession._sendMethodRequest \
- (broker, classKey, objectId, methodName, args)
-
- if seq is not None:
- self.model.outstandingMethodCalls[seq] = callback
-
- return seq
- finally:
self.model.lock.release()
-
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, self.id)
14 years, 11 months
rhmessaging commits: r3782 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-11 14:13:53 -0500 (Mon, 11 Jan 2010)
New Revision: 3782
Modified:
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
Log:
Purely cosmetic change: four-space indents and more pythonic naming
Modified: mgmt/trunk/mint/python/mint/expire.py
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/expire.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -79,4 +79,4 @@
log.debug("%i total records expired", total)
- stats.expireUpdateCount += 1
+ stats.expired += 1
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -473,14 +473,14 @@
stats = app.updateThread.stats
- enq = stats.enqueueCount
- deq = stats.dequeueCount
- drp = stats.dropCount
- dfr = stats.deferCount
+ enq = stats.enqueued
+ deq = stats.dequeued
+ drp = stats.dropped
+ dfr = stats.deferred
- prop = stats.propUpdateCount
- stat = stats.statUpdateCount
- exp = stats.expireUpdateCount
+ prop = stats.prop_updated
+ stat = stats.stat_updated
+ exp = stats.expired
print row % (enq - enq_last,
deq - deq_last,
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-11 18:46:28 UTC (rev 3781)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-11 19:13:53 UTC (rev 3782)
@@ -11,58 +11,58 @@
log = logging.getLogger("mint.update")
class UpdateThread(MintDaemonThread):
- """
- Only the update thread writes to the database
- """
+ """
+ Only the update thread writes to the database
+ """
- def __init__(self, app):
- super(UpdateThread, self).__init__(app)
+ def __init__(self, app):
+ super(UpdateThread, self).__init__(app)
- self.conn = None
- self.stats = None
+ self.conn = None
+ self.stats = None
- self.updates = UpdateQueue(slotCount=2)
+ self.updates = UpdateQueue(slotCount=2)
- def init(self):
- self.conn = self.app.database.getConnection()
- self.stats = UpdateStats()
+ def init(self):
+ self.conn = self.app.database.getConnection()
+ self.stats = UpdateStats()
- def enqueue(self, update):
- update.thread = self
+ def enqueue(self, update):
+ update.thread = self
- self.updates.put(update)
+ self.updates.put(update)
- self.stats.enqueueCount += 1
+ self.stats.enqueued += 1
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
- if self.updates.qsize() > 1000:
- sleep(0.1)
+ if self.updates.qsize() > 1000:
+ sleep(0.1)
- def run(self):
- while True:
- if self.stopRequested:
- break
+ def run(self):
+ while True:
+ if self.stopRequested:
+ break
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
+ try:
+ update = self.updates.get(True, 1)
+ except Empty:
+ continue
- self.stats.dequeueCount += 1
+ self.stats.dequeued += 1
- update.process()
+ update.process()
class UpdateStats(object):
- def __init__(self):
- self.enqueueCount = 0
- self.dequeueCount = 0
- self.statUpdateCount = 0
- self.propUpdateCount = 0
- self.expireUpdateCount = 0
- self.dropCount = 0
- self.deferCount = 0
+ def __init__(self):
+ self.enqueued = 0
+ self.dequeued = 0
+ self.stat_updated = 0
+ self.prop_updated = 0
+ self.expired = 0
+ self.dropped = 0
+ self.deferred = 0
class ReferenceException(Exception):
def __init__(self, sought):
@@ -72,326 +72,330 @@
return repr(self.sought)
class Update(object):
- def __init__(self):
- self.thread = None
- self.priority = 0
+ def __init__(self):
+ self.thread = None
+ self.priority = 0
- def process(self):
- log.debug("Processing %s", self)
+ def process(self):
+ log.debug("Processing %s", self)
- assert self.thread
+ assert self.thread
- conn = self.thread.conn
- stats = self.thread.stats
+ conn = self.thread.conn
+ stats = self.thread.stats
- try:
- self.do_process(conn, stats)
+ try:
+ self.do_process(conn, stats)
- conn.commit()
- except:
- log.exception("Update failed")
+ conn.commit()
+ except:
+ log.exception("Update failed")
- conn.rollback()
+ conn.rollback()
- def do_process(self, conn, stats):
- raise Exception("Not implemented")
+ def do_process(self, conn, stats):
+ raise Exception("Not implemented")
- def __repr__(self):
- return "%s(%i)" % (self.__class__.__name__, self.priority)
+ def __repr__(self):
+ return "%s(%i)" % (self.__class__.__name__, self.priority)
class ObjectUpdate(Update):
- def __init__(self, agent, object):
- super(ObjectUpdate, self).__init__()
+ def __init__(self, agent, object):
+ super(ObjectUpdate, self).__init__()
- self.agent = agent
- self.object = object
+ self.agent = agent
+ self.object = object
- self.object_id = str(QmfObjectId.fromObject(object))
+ self.object_id = str(QmfObjectId.fromObject(object))
- def getClass(self):
- # XXX this is unfortunate
- name = self.object.getClassKey().getClassName()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
- name = name[0].upper() + name[1:]
+ def getClass(self):
+ # XXX this is unfortunate
+ name = self.object.getClassKey().getClassName()
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
+ name = name[0].upper() + name[1:]
- try:
- return schemaNameToClassMap[name]
- except KeyError:
- raise ReferenceException(name)
+ try:
+ return schemaNameToClassMap[name]
+ except KeyError:
+ raise ReferenceException(name)
- def processAttributes(self, attrs, cls):
- results = dict()
+ def process_attributes(self, attrs, cls):
+ results = dict()
- for key, value in attrs:
- name = key.__repr__()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
+ for key, value in attrs:
+ name = key.__repr__()
+ name = mint.schema.schemaReservedWordsMap.get(name, name)
- if key.type == 10:
- # XXX don't want oid around much
- self.processReference(name, value, results)
- continue
+ if key.type == 10:
+ # XXX don't want oid around much
+ self.process_reference(name, value, results)
+ continue
- if not hasattr(cls, name):
- # Discard attrs that we don't have in our schema
- log.debug("%s has no field '%s'" % (cls, name))
- continue
+ if not hasattr(cls, name):
+ # Discard attrs that we don't have in our schema
+ log.debug("%s has no field '%s'" % (cls, name))
+ continue
- if key.type == 8:
- self.processTimestamp(name, value, results)
- continue
+ if key.type == 8:
+ self.process_timestamp(name, value, results)
+ continue
- if key.type == 14:
- # Convert UUIDs into their string representation, to be
- # handled by sqlobject
- results[name] = str(value)
- continue
+ if key.type == 14:
+ # Convert UUIDs into their string representation, to be
+ # handled by sqlobject
+ results[name] = str(value)
+ continue
- if key.type == 15:
- #if value:
- results[name] = pickle.dumps(value)
- continue
+ if key.type == 15:
+ #if value:
+ results[name] = pickle.dumps(value)
+ continue
- results[name] = value
+ results[name] = value
- return results
+ return results
- # XXX this needs to be a much more straightforward procedure
- def processReference(self, name, oid, results):
- if name.endswith("Ref"):
- name = name[:-3]
+ # XXX this needs to be a much more straightforward procedure
+ def process_reference(self, name, oid, results):
+ if name.endswith("Ref"):
+ name = name[:-3]
- className = name[0].upper() + name[1:]
- otherClass = getattr(mint, className)
+ className = name[0].upper() + name[1:]
+ otherClass = getattr(mint, className)
- foreignKey = name + "_id"
+ foreignKey = name + "_id"
- id = self.agent.databaseIds.get(str(QmfObjectId(oid.first, oid.second)))
+ object_id = str(QmfObjectId(oid.first, oid.second))
+ id = self.agent.databaseIds.get(object_id)
- if id is None:
- # XXX don't want oid around much
- raise ReferenceException(oid)
+ if id is None:
+ # XXX don't want oid around much
+ raise ReferenceException(oid)
- results[foreignKey] = id
+ results[foreignKey] = id
- def processTimestamp(self, name, tstamp, results):
- if tstamp:
- t = datetime.fromtimestamp(tstamp / 1000000000)
- results[name] = t
+ def process_timestamp(self, name, tstamp, results):
+ if tstamp:
+ t = datetime.fromtimestamp(tstamp / 1000000000)
+ results[name] = t
- def __repr__(self):
- cls = self.object.getClassKey().getClassName()
+ def __repr__(self):
+ cls = self.object.getClassKey().getClassName()
- return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
- self.agent.id,
- cls,
- self.object_id,
- self.priority)
+ return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
+ self.agent.id,
+ cls,
+ self.object_id,
+ self.priority)
class PropertyUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
+ def do_process(self, conn, stats):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
- stats.dropCount += 1
+ stats.dropped += 1
- return
+ return
- try:
- attrs = self.processAttributes(self.object.getProperties(), cls)
- except ReferenceException, e:
- log.info("Referenced object %r not found", e.sought)
+ try:
+ attrs = self.process_attributes(self.object.getProperties(), cls)
+ except ReferenceException, e:
+ log.info("Referenced object %r not found", e.sought)
- self.agent.deferredUpdates[self.object_id].append(self)
+ self.agent.deferredUpdates[self.object_id].append(self)
- stats.deferCount += 1
+ stats.deferred += 1
- return
+ return
- update, create, delete = self.object.getTimestamps()
+ update, create, delete = self.object.getTimestamps()
- self.processTimestamp("qmfUpdateTime", update, attrs)
- self.processTimestamp("qmfCreateTime", create, attrs)
+ self.process_timestamp("qmfUpdateTime", update, attrs)
+ self.process_timestamp("qmfCreateTime", create, attrs)
- if delete != 0:
- self.processTimestamp("qmfDeleteTime", delete, attrs)
+ if delete != 0:
+ self.process_timestamp("qmfDeleteTime", delete, attrs)
- log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, self.object_id)
+ log.debug("%s(%s,%s) marked deleted",
+ cls.__name__, self.agent.id, self.object_id)
- attrs["qmfAgentId"] = self.agent.id
- attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(self.object_id)
- attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
+ attrs["qmfAgentId"] = self.agent.id
+ attrs["qmfClassKey"] = str(self.object.getClassKey())
+ attrs["qmfObjectId"] = str(self.object_id)
+ attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
- cursor = conn.cursor()
+ cursor = conn.cursor()
- # Cases:
- #
- # 1. Object is utterly new to mint
- # 2. Object is in mint's db, but id is not yet cached
- # 3. Object is in mint's db, and id is cached
+ # Cases:
+ #
+ # 1. Object is utterly new to mint
+ # 2. Object is in mint's db, but id is not yet cached
+ # 3. Object is in mint's db, and id is cached
- id = self.agent.databaseIds.get(self.object_id)
+ id = self.agent.databaseIds.get(self.object_id)
- if id is None:
- # Case 1 or 2
+ if id is None:
+ # Case 1 or 2
- op = SqlGetId(cls)
- op.execute(cursor, attrs)
+ op = SqlGetId(cls)
+ op.execute(cursor, attrs)
- rec = cursor.fetchone()
+ rec = cursor.fetchone()
- if rec:
- id = rec[0]
+ if rec:
+ id = rec[0]
- if id is None:
- # Case 1
+ if id is None:
+ # Case 1
- op = SqlInsert(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlInsert(cls, attrs)
+ op.execute(cursor, attrs)
- id = cursor.fetchone()[0]
+ id = cursor.fetchone()[0]
- log.debug("%s(%i) created", cls.__name__, id)
- else:
- # Case 2
+ log.debug("%s(%i) created", cls.__name__, id)
+ else:
+ # Case 2
- attrs["id"] = id
+ attrs["id"] = id
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
- assert cursor.rowcount == 1
+ assert cursor.rowcount == 1
- self.agent.databaseIds.set(self.object_id, id)
- else:
- # Case 3
+ self.agent.databaseIds.set(self.object_id, id)
+ else:
+ # Case 3
- attrs["id"] = id
+ attrs["id"] = id
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
+ op = SqlUpdate(cls, attrs)
+ op.execute(cursor, attrs)
- #assert cursor.rowcount == 1
+ #assert cursor.rowcount == 1
- try:
- updates = self.agent.deferredUpdates.pop(self.object_id)
+ try:
+ updates = self.agent.deferredUpdates.pop(self.object_id)
- if updates:
- log.info("Re-enqueueing %i orphans whose creation had been deferred",
- len(updates))
+ if updates:
+ log.info("Reenqueueing %i deferred updates", len(updates))
- for update in updates:
- self.thread.enqueue(update)
- except KeyError:
- pass
+ for update in updates:
+ self.thread.enqueue(update)
+ except KeyError:
+ pass
- self.agent.databaseIds.commit()
+ self.agent.databaseIds.commit()
- stats.propUpdateCount += 1
+ stats.prop_updated += 1
class StatisticUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
- return
+ def do_process(self, conn, stats):
+ try:
+ cls = self.getClass()
+ except ReferenceException, e:
+ log.info("Referenced class %r not found", e.sought)
+ return
- statsCls = getattr(mint, "%sStats" % cls.__name__)
+ statsCls = getattr(mint, "%sStats" % cls.__name__)
- id = self.agent.databaseIds.get(self.object_id)
+ id = self.agent.databaseIds.get(self.object_id)
- if id is None:
- stats.dropCount += 1
- return
+ if id is None:
+ stats.dropped += 1
+ return
- timestamps = self.object.getTimestamps()
+ timestamps = self.object.getTimestamps()
- tnow = datetime.now()
- t = datetime.fromtimestamp(timestamps[0] / 1000000000)
+ tnow = datetime.now()
+ t = datetime.fromtimestamp(timestamps[0] / 1000000000)
- if t < tnow - timedelta(seconds=30):
- log.debug("Update is %i seconds old; skipping it", (tnow - t).seconds)
+ if t < tnow - timedelta(seconds=30):
+ seconds = (tnow - t).seconds
+ log.debug("Update is %i seconds old; skipping it", seconds)
- stats.dropCount += 1
+ stats.dropped += 1
- return
+ return
- try:
- attrs = self.processAttributes(self.object.getStatistics(), statsCls)
- except ReferenceException:
- stats.dropCount += 1
+ try:
+ attrs = self.process_attributes \
+ (self.object.getStatistics(), statsCls)
+ except ReferenceException:
+ stats.dropped += 1
- return
+ return
- attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
- attrs["%s_id" % cls.sqlmeta.table] = id
+ # XXX do we still want this
+ attrs["qmfUpdateTime"] = t > tnow and tnow or t
+ attrs["%s_id" % cls.sqlmeta.table] = id
- cursor = conn.cursor()
+ cursor = conn.cursor()
- op = SqlInsert(statsCls, attrs)
- op.execute(cursor, attrs)
+ op = SqlInsert(statsCls, attrs)
+ op.execute(cursor, attrs)
- log.debug("%s(%s) created", statsCls.__name__, id)
+ log.debug("%s(%s) created", statsCls.__name__, id)
- stats.statUpdateCount += 1
+ stats.stat_updated += 1
class AgentDisconnectUpdate(Update):
- def __init__(self, agent):
- super(AgentDisconnectUpdate, self).__init__()
+ def __init__(self, agent):
+ super(AgentDisconnectUpdate, self).__init__()
- self.agent = agent
+ self.agent = agent
- def do_process(self, conn, stats):
- cursor = conn.cursor()
+ def do_process(self, conn, stats):
+ cursor = conn.cursor()
- args = dict()
+ args = dict()
- if self.agent:
- args["qmf_agent_id"] = self.agent.id
+ if self.agent:
+ args["qmf_agent_id"] = self.agent.id
- op = SqlAgentDisconnect(self.agent)
- op.execute(cursor, args)
+ op = SqlAgentDisconnect(self.agent)
+ op.execute(cursor, args)
class UpdateQueue(ConcurrentQueue):
- def __init__(self, maxsize=0, slotCount=1):
- self.slotCount = slotCount
- ConcurrentQueue.__init__(self, maxsize)
+ def __init__(self, maxsize=0, slotCount=1):
+ self.slotCount = slotCount
+ ConcurrentQueue.__init__(self, maxsize)
- def _init(self, maxsize):
- self.maxsize = maxsize
- self.slots = []
+ def _init(self, maxsize):
+ self.maxsize = maxsize
+ self.slots = []
- for i in range(self.slotCount):
- self.slots.append(deque())
+ for i in range(self.slotCount):
+ self.slots.append(deque())
- def _qsize(self):
- size = 0
+ def _qsize(self):
+ size = 0
- for i in range(self.slotCount):
- size += len(self.slots[i])
+ for i in range(self.slotCount):
+ size += len(self.slots[i])
- return size
+ return size
- def _empty(self):
- return self._qsize() == 0
+ def _empty(self):
+ return self._qsize() == 0
- def _full(self):
- return self.maxsize > 0 and self._qsize() == self.maxsize
+ def _full(self):
+ return self.maxsize > 0 and self._qsize() == self.maxsize
- def _put(self, update):
- slot = update.priority
+ def _put(self, update):
+ slot = update.priority
- if slot in range(self.slotCount):
- self.slots[slot].append(update)
- else:
- raise ValueError("Invalid priority slot")
+ if slot in range(self.slotCount):
+ self.slots[slot].append(update)
+ else:
+ raise ValueError("Invalid priority slot")
- def _get(self):
- for slot in range(self.slotCount):
- if len(self.slots[slot]) > 0:
- return self.slots[slot].popleft()
- return None
+ def _get(self):
+ for slot in range(self.slotCount):
+ if len(self.slots[slot]) > 0:
+ return self.slots[slot].popleft()
+
+ return None
14 years, 11 months
rhmessaging commits: r3781 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-11 13:46:28 -0500 (Mon, 11 Jan 2010)
New Revision: 3781
Modified:
mgmt/trunk/mint/python/mint/expire.py
mgmt/trunk/mint/python/mint/model.py
mgmt/trunk/mint/python/mint/tools.py
mgmt/trunk/mint/python/mint/update.py
mgmt/trunk/mint/python/mint/vacuum.py
Log:
* Move certain non-object update operations to dedicated files
* Change the update interface to account for non-object updates
* Isolate stats in their own record object
* Move the on-open agent disconnect update to start from init
* Improve update logging
Modified: mgmt/trunk/mint/python/mint/expire.py
===================================================================
--- mgmt/trunk/mint/python/mint/expire.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/expire.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -1,11 +1,10 @@
-import logging
-import mint
-import time
-from threading import Thread
-from mint.schema import *
+from schema import *
from sql import *
+from update import *
from util import *
+import mint
+
log = logging.getLogger("mint.expire")
class ExpireThread(MintDaemonThread):
@@ -39,10 +38,12 @@
while True:
if self.stopRequested:
break
- up = mint.update.ExpireUpdate(self.app.model)
+
+ up = ExpireUpdate()
self.app.updateThread.enqueue(up)
- time.sleep(frequency)
+ sleep(frequency)
+
def __convertTimeUnits(self, t):
if t / (24*3600) >= 1:
t_out = t / (24*3600)
@@ -57,3 +58,25 @@
t_out = t
t_unit = "seconds"
return (t_out, t_unit)
+
+class ExpireUpdate(Update):
+ def do_process(self, conn, stats):
+ attrs = self.thread.app.expireThread.attrs
+
+ cursor = conn.cursor()
+ total = 0
+
+ for op in self.thread.app.expireThread.ops:
+ log.debug("Running expire op %s", op)
+
+ count = op.execute(cursor, attrs)
+
+ conn.commit()
+
+ log.debug("%i records expired", count)
+
+ total += count
+
+ log.debug("%i total records expired", total)
+
+ stats.expireUpdateCount += 1
Modified: mgmt/trunk/mint/python/mint/model.py
===================================================================
--- mgmt/trunk/mint/python/mint/model.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/model.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -46,14 +46,14 @@
self.qmfSession = Session \
(self, manageConnections=True, rcvObjects=self.app.updateEnabled)
+ def do_start(self):
# Clean up any transient objects that a previous instance may have
# left behind in the DB; it's basically an unconstrained agent
# disconnect update, for any agent
- up = update.AgentDisconnectUpdate(self, None)
+ up = update.AgentDisconnectUpdate(None)
self.app.updateThread.enqueue(up)
- def do_start(self):
uris = [x.strip() for x in self.app.config.qmf.split(",")]
for uri in uris:
@@ -137,7 +137,7 @@
finally:
self.lock.release()
- up = update.AgentDisconnectUpdate(self, agent)
+ up = update.AgentDisconnectUpdate(agent)
self.app.updateThread.enqueue(up)
def heartbeat(self, qagent, timestamp):
@@ -177,7 +177,7 @@
finally:
self.lock.release()
- up = update.PropertyUpdate(self, agent, object)
+ up = update.PropertyUpdate(agent, object)
self.app.updateThread.enqueue(up)
def objectStats(self, broker, object):
@@ -198,7 +198,7 @@
finally:
self.lock.release()
- up = update.StatisticUpdate(self, agent, object)
+ up = update.StatisticUpdate(agent, object)
self.app.updateThread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/trunk/mint/python/mint/tools.py
===================================================================
--- mgmt/trunk/mint/python/mint/tools.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/tools.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -156,7 +156,6 @@
app = Mint(self.config)
app.updateEnabled = False
- app.pollEnabled = False
app.expireEnabled = False
self.database = MintDatabase(app)
@@ -472,16 +471,16 @@
sleep(1)
- ut = app.updateThread
+ stats = app.updateThread.stats
- enq = ut.enqueueCount
- deq = ut.dequeueCount
- drp = ut.dropCount
- dfr = ut.deferCount
+ enq = stats.enqueueCount
+ deq = stats.dequeueCount
+ drp = stats.dropCount
+ dfr = stats.deferCount
- prop = ut.propUpdateCount
- stat = ut.statUpdateCount
- exp = ut.expireUpdateCount
+ prop = stats.propUpdateCount
+ stat = stats.statUpdateCount
+ exp = stats.expireUpdateCount
print row % (enq - enq_last,
deq - deq_last,
Modified: mgmt/trunk/mint/python/mint/update.py
===================================================================
--- mgmt/trunk/mint/python/mint/update.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/update.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -18,34 +18,26 @@
def __init__(self, app):
super(UpdateThread, self).__init__(app)
- self.updates = UpdateQueue(slotCount=2)
-
- self.enqueueCount = 0
- self.dequeueCount = 0
- self.statUpdateCount = 0
- self.propUpdateCount = 0
- self.expireUpdateCount = 0
- self.dropCount = 0
- self.deferCount = 0
- self.commitThreshold = 100
-
self.conn = None
+ self.stats = None
+ self.updates = UpdateQueue(slotCount=2)
+
def init(self):
self.conn = self.app.database.getConnection()
+ self.stats = UpdateStats()
def enqueue(self, update):
- try:
- self.updates.put(update)
+ update.thread = self
- self.enqueueCount += 1
- except Full:
- log.exception("Queue is full")
+ self.updates.put(update)
- if self.updates.qsize() > 1000:
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
+ self.stats.enqueueCount += 1
+ # This is an attempt to yield from the enqueueing thread (this
+ # method's caller) to the update thread
+
+ if self.updates.qsize() > 1000:
sleep(0.1)
def run(self):
@@ -58,55 +50,63 @@
except Empty:
continue
- self.dequeueCount += 1
+ self.stats.dequeueCount += 1
- self.processUpdate(update)
+ update.process()
- def processUpdate(self, update):
- log.debug("Processing %s", update)
+class UpdateStats(object):
+ def __init__(self):
+ self.enqueueCount = 0
+ self.dequeueCount = 0
+ self.statUpdateCount = 0
+ self.propUpdateCount = 0
+ self.expireUpdateCount = 0
+ self.dropCount = 0
+ self.deferCount = 0
- try:
- update.process(self)
+class ReferenceException(Exception):
+ def __init__(self, sought):
+ self.sought = sought
- if self.dequeueCount % self.commitThreshold == 0 \
- or self.updates.qsize() == 0:
- # commit only every "commitThreshold" updates, or whenever
- # the update queue is empty
+ def __str__(self):
+ return repr(self.sought)
- update.commit()
- self.conn.commit()
+class Update(object):
+ def __init__(self):
+ self.thread = None
+ self.priority = 0
+
+ def process(self):
+ log.debug("Processing %s", self)
+
+ assert self.thread
+
+ conn = self.thread.conn
+ stats = self.thread.stats
+
+ try:
+ self.do_process(conn, stats)
+
+ conn.commit()
except:
log.exception("Update failed")
- update.rollback()
- self.conn.rollback()
+ conn.rollback()
- def cursor(self):
- return self.conn.cursor()
+ def do_process(self, conn, stats):
+ raise Exception("Not implemented")
-class ReferenceException(Exception):
- def __init__(self, sought):
- self.sought = sought
+ def __repr__(self):
+ return "%s(%i)" % (self.__class__.__name__, self.priority)
- def __str__(self):
- return repr(self.sought)
+class ObjectUpdate(Update):
+ def __init__(self, agent, object):
+ super(ObjectUpdate, self).__init__()
-class ModelUpdate(object):
- def __init__(self, model, agent, object):
- if agent:
- from mint.model import MintAgent
- assert isinstance(agent, MintAgent)
-
- self.model = model
self.agent = agent
self.object = object
- self.priority = 0
- def __repr__(self):
- return "%s(%s, %s, %i)" % (self.__class__.__name__,
- str(self.agent),
- str(self.object),
- self.priority)
+ self.object_id = str(QmfObjectId.fromObject(object))
def getClass(self):
# XXX this is unfortunate
@@ -119,9 +119,6 @@
except KeyError:
raise ReferenceException(name)
- def process(self, thread):
- pass
-
def processAttributes(self, attrs, cls):
results = dict()
@@ -181,38 +178,35 @@
t = datetime.fromtimestamp(tstamp / 1000000000)
results[name] = t
- def commit(self):
- # XXX commit db here too
+ def __repr__(self):
+ cls = self.object.getClassKey().getClassName()
- if self.agent:
- self.agent.databaseIds.commit()
+ return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
+ self.agent.id,
+ cls,
+ self.object_id,
+ self.priority)
- def rollback(self):
- # XXX rollback db here too
-
- if self.agent:
- self.agent.databaseIds.rollback()
-
-class PropertyUpdate(ModelUpdate):
- def process(self, thread):
+class PropertyUpdate(ObjectUpdate):
+ def do_process(self, conn, stats):
try:
cls = self.getClass()
except ReferenceException, e:
log.info("Referenced class %r not found", e.sought)
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
- qmfObjectId = str(QmfObjectId.fromObject(self.object))
-
try:
attrs = self.processAttributes(self.object.getProperties(), cls)
except ReferenceException, e:
log.info("Referenced object %r not found", e.sought)
- self.agent.deferredUpdates[qmfObjectId].append(self)
+ self.agent.deferredUpdates[self.object_id].append(self)
- thread.deferCount += 1
+ stats.deferCount += 1
+
return
update, create, delete = self.object.getTimestamps()
@@ -224,14 +218,14 @@
self.processTimestamp("qmfDeleteTime", delete, attrs)
log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, qmfObjectId)
+ cls.__name__, self.agent.id, self.object_id)
attrs["qmfAgentId"] = self.agent.id
attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(qmfObjectId)
+ attrs["qmfObjectId"] = str(self.object_id)
attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
- cursor = thread.cursor()
+ cursor = conn.cursor()
# Cases:
#
@@ -239,7 +233,7 @@
# 2. Object is in mint's db, but id is not yet cached
# 3. Object is in mint's db, and id is cached
- id = self.agent.databaseIds.get(qmfObjectId)
+ id = self.agent.databaseIds.get(self.object_id)
if id is None:
# Case 1 or 2
@@ -271,7 +265,7 @@
assert cursor.rowcount == 1
- self.agent.databaseIds.set(qmfObjectId, id)
+ self.agent.databaseIds.set(self.object_id, id)
else:
# Case 3
@@ -283,21 +277,23 @@
#assert cursor.rowcount == 1
try:
- updates = self.agent.deferredUpdates.pop(qmfObjectId)
+ updates = self.agent.deferredUpdates.pop(self.object_id)
if updates:
log.info("Re-enqueueing %i orphans whose creation had been deferred",
len(updates))
for update in updates:
- thread.enqueue(update)
+ self.thread.enqueue(update)
except KeyError:
pass
- thread.propUpdateCount += 1
+ self.agent.databaseIds.commit()
-class StatisticUpdate(ModelUpdate):
- def process(self, thread):
+ stats.propUpdateCount += 1
+
+class StatisticUpdate(ObjectUpdate):
+ def do_process(self, conn, stats):
try:
cls = self.getClass()
except ReferenceException, e:
@@ -306,12 +302,10 @@
statsCls = getattr(mint, "%sStats" % cls.__name__)
- qmfObjectId = str(QmfObjectId.fromObject(self.object))
+ id = self.agent.databaseIds.get(self.object_id)
- id = self.agent.databaseIds.get(qmfObjectId)
-
if id is None:
- thread.dropCount += 1
+ stats.dropCount += 1
return
timestamps = self.object.getTimestamps()
@@ -322,60 +316,38 @@
if t < tnow - timedelta(seconds=30):
log.debug("Update is %i seconds old; skipping it", (tnow - t).seconds)
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
try:
attrs = self.processAttributes(self.object.getStatistics(), statsCls)
except ReferenceException:
- thread.dropCount += 1
+ stats.dropCount += 1
+
return
attrs["qmfUpdateTime"] = t > tnow and tnow or t # XXX do we still want this
attrs["%s_id" % cls.sqlmeta.table] = id
- cursor = thread.cursor()
+ cursor = conn.cursor()
op = SqlInsert(statsCls, attrs)
op.execute(cursor, attrs)
log.debug("%s(%s) created", statsCls.__name__, id)
- thread.statUpdateCount += 1
+ stats.statUpdateCount += 1
-class ExpireUpdate(ModelUpdate):
- def __init__(self, model):
- super(ExpireUpdate, self).__init__(model, None, None)
+class AgentDisconnectUpdate(Update):
+ def __init__(self, agent):
+ super(AgentDisconnectUpdate, self).__init__()
- def process(self, thread):
- cursor = thread.cursor()
- attrs = thread.app.expireThread.attrs
- total = 0
+ self.agent = agent
- thread.conn.commit()
+ def do_process(self, conn, stats):
+ cursor = conn.cursor()
- for op in thread.app.expireThread.ops:
- log.debug("Running expire op %s", op)
-
- count = op.execute(cursor, attrs)
-
- thread.conn.commit()
-
- log.debug("%i records expired", count)
-
- total += count
-
- log.debug("%i total records expired", total)
-
- thread.expireUpdateCount += 1
-
-class AgentDisconnectUpdate(ModelUpdate):
- def __init__(self, model, agent):
- super(AgentDisconnectUpdate, self).__init__(model, agent, None)
-
- def process(self, thread):
- cursor = thread.cursor()
-
args = dict()
if self.agent:
Modified: mgmt/trunk/mint/python/mint/vacuum.py
===================================================================
--- mgmt/trunk/mint/python/mint/vacuum.py 2010-01-11 15:48:46 UTC (rev 3780)
+++ mgmt/trunk/mint/python/mint/vacuum.py 2010-01-11 18:46:28 UTC (rev 3781)
@@ -4,32 +4,26 @@
log = logging.getLogger("mint.vacuum")
class VacuumThread(MintDaemonThread):
- def __init__(self, app):
- super(VacuumThread, self).__init__(app)
-
def run(self):
while True:
if self.stopRequested:
break
- up = VacuumUpdate(self.app.model)
+ up = VacuumUpdate()
self.app.updateThread.enqueue(up)
sleep(60 * 10)
-class VacuumUpdate(ModelUpdate):
- def __init__(self, model):
- super(VacuumUpdate, self).__init__(model, None, None)
-
- def process(self, thread):
+class VacuumUpdate(Update):
+ def do_process(self, conn, stats):
log.info("Vacuuming")
- thread.conn.commit()
+ conn.commit()
- level = thread.conn.isolation_level
- thread.conn.set_isolation_level(0)
+ level = conn.isolation_level
+ conn.set_isolation_level(0)
- cursor = thread.cursor()
+ cursor = conn.cursor()
cursor.execute("vacuum")
- thread.conn.set_isolation_level(level)
+ conn.set_isolation_level(level)
14 years, 11 months
rhmessaging commits: r3780 - in mgmt/trunk/cumin/python/cumin: grid and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-01-11 10:48:46 -0500 (Mon, 11 Jan 2010)
New Revision: 3780
Modified:
mgmt/trunk/cumin/python/cumin/grid/pool.strings
mgmt/trunk/cumin/python/cumin/widgets.strings
Log:
Display the darkened page background when a form is popped up.
Modified: mgmt/trunk/cumin/python/cumin/grid/pool.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/pool.strings 2010-01-11 15:46:51 UTC (rev 3779)
+++ mgmt/trunk/cumin/python/cumin/grid/pool.strings 2010-01-11 15:48:46 UTC (rev 3780)
@@ -135,21 +135,32 @@
updatePoolSlotVis(id, "reload");
return false;
}
- var flashversion = swfobject.getFlashPlayerVersion();
- if (flashversion.major >= 9) {
- var fsm = document.getElementById('flashSlotMap');
- if (fsm)
- fsm.style.display = "block";
- var params = {menu: "false", scale: "noScale", allowFullscreen: "true", allowScriptAccess: "always", bgcolor: "#FFFFFF"};
- swfobject.embedSWF("resource?name=slots.swf", "{id}_chart", "{slot_chart_width}", "{slot_chart_height}", "9.0.0", "", {vis:"slots"}, params);
- swfobject.embedSWF("resource?name=slots.swf", "{id}ctrl_chart", "{slot_chart_width}", "{slot_ctrl_height}", "9.0.0", "", {vis:"ctrl"}, params);
+ function init_flash() {
+ var flashversion = swfobject.getFlashPlayerVersion();
+ if (flashversion.major >= 9) {
+ var fsm = document.getElementById('flashSlotMap');
+ if (fsm)
+ fsm.style.display = "block";
+
+ var params = {menu: "false", scale: "noScale", allowFullscreen: "true", allowScriptAccess: "always", bgcolor: "#FFFFFF"};
+ var branch = wooly.session.branch();
+ if (branch.formBackground) {
+ var i = 1;
+ params.wmode = "opaque";
+ }
- wooly.addPageUpdateListener(function () { updatePoolSlotVis('{id}', "reload"); });
- wooly.addPageUpdateListener(function () { updatePoolSlotVis('{id}ctrl', "reload"); });
- window.addEvent('domready',function () {
- cumin.setFullpageHandler('{id}', '{fullpage_href}');
- });
+ swfobject.embedSWF("resource?name=slots.swf", "{id}_chart", "{slot_chart_width}", "{slot_chart_height}", "9.0.0", "", {vis:"slots"}, params);
+ swfobject.embedSWF("resource?name=slots.swf", "{id}ctrl_chart", "{slot_chart_width}", "{slot_ctrl_height}", "9.0.0", "", {vis:"ctrl"}, params);
+
+ wooly.addPageUpdateListener(function () { updatePoolSlotVis('{id}', "reload"); });
+ wooly.addPageUpdateListener(function () { updatePoolSlotVis('{id}ctrl', "reload"); });
+ window.addEvent('domready',function () {
+ cumin.setFullpageHandler('{id}', '{fullpage_href}');
+ });
+ }
}
+ window.addEvent('domready', init_flash);
+
//]]>
</script>
Modified: mgmt/trunk/cumin/python/cumin/widgets.strings
===================================================================
--- mgmt/trunk/cumin/python/cumin/widgets.strings 2010-01-11 15:46:51 UTC (rev 3779)
+++ mgmt/trunk/cumin/python/cumin/widgets.strings 2010-01-11 15:48:46 UTC (rev 3780)
@@ -723,7 +723,7 @@
[BackgroundInclude.html]
<div id="BackgroundGlass"><!-- prevents clicks since disabled doesn't work on objects --></div>
-<!-- <object class="BackgroundInclude" data="{data}" type="{type}" /> -->
+<object id="backgroundInclude" class="BackgroundInclude" data="{data};formBackground=1" type="{type}" />
[FormHelp.javascript]
function help_window(href) {
14 years, 11 months
rhmessaging commits: r3779 - mgmt/trunk/wooly/resources.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2010-01-11 10:46:51 -0500 (Mon, 11 Jan 2010)
New Revision: 3779
Modified:
mgmt/trunk/wooly/resources/wooly.js
Log:
Fix problem when no url is passed to session.branch(). Use window.location.href (string) instead of window.location (object).
Modified: mgmt/trunk/wooly/resources/wooly.js
===================================================================
--- mgmt/trunk/wooly/resources/wooly.js 2010-01-11 15:00:27 UTC (rev 3778)
+++ mgmt/trunk/wooly/resources/wooly.js 2010-01-11 15:46:51 UTC (rev 3779)
@@ -718,7 +718,7 @@
function Session() {
this.branch = function (url) {
- if (arguments.length == 0) url = window.location;
+ if (arguments.length == 0) url = window.location.href + "";
return new Branch(url);
}
14 years, 11 months
rhmessaging commits: r3778 - in store/branches/java/0.5-release: src/tools/java and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: rgemmell
Date: 2010-01-11 10:00:27 -0500 (Mon, 11 Jan 2010)
New Revision: 3778
Added:
store/branches/java/0.5-release/bin/bindingsWorkaround.sh
store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
Log:
Add initial version of tool to workaround broker bindings issue. Use the broker config to load the VirtualHosts and then insert required durable Exchange entries into the store
Added: store/branches/java/0.5-release/bin/bindingsWorkaround.sh
===================================================================
--- store/branches/java/0.5-release/bin/bindingsWorkaround.sh (rev 0)
+++ store/branches/java/0.5-release/bin/bindingsWorkaround.sh 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+QPID_VERSION=0.5
+
+if [ -z "$QPID_HOME" ]; then
+ echo "QPID_HOME not set. Exiting"
+ exit 1
+fi
+
+if [ -z "$QPID_WORK" ]; then
+ echo "QPID_WORK not set. Exiting"
+ exit 1
+fi
+
+QPID_SYS_PROPS="-DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
+
+LIBS=$QPID_HOME/lib/qpid-bdbtools-$QPID_VERSION.jar:$QPID_HOME/lib/je-3.3.62.jar:$QPID_HOME/lib/qpid-bdbstore-$QPID_VERSION.jar:$QPID_HOME/lib/qpid-all.jar
+
+java -Dlog4j.configuration=BDBStoreBindingsWorkaround-log4j.xml $QPID_SYS_PROPS -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround "$@"
+exitValue=$?
+
+if [ $exitValue != 0 ]
+then
+ exit $exitValue
+fi
+
Added: store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
===================================================================
--- store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround">
+ <priority value="info"/>
+ </category>
+
+ <!-- Only show errors -->
+ <category name="org.apache">
+ <priority value="error"/>
+ </category>
+
+ <root>
+ <priority value="error"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</log4j:configuration>
Added: store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
===================================================================
--- store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.configuration.Configuration.InitException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class BDBStoreBindingsWorkaround
+{
+ private static final Logger _logger = LoggerFactory.getLogger(BDBStoreBindingsWorkaround.class);
+
+ private Configuration _config;
+ private boolean _initialised = false;
+
+ public static void main(String[] args) throws Configuration.InitException
+ {
+ BDBStoreBindingsWorkaround tool = new BDBStoreBindingsWorkaround(args);
+
+ tool.start();
+
+ //Shut down the JVM gracefully, the ShutdownHook will stop the VirtualHosts.
+ System.exit(0);
+ }
+
+ public BDBStoreBindingsWorkaround(String[] args) throws InitException
+ {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+
+ loadConfig(args);
+ }
+
+ @SuppressWarnings("static-access")
+ private void loadConfig(String args[]) throws InitException
+ {
+ _config = new Configuration();
+
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg()
+ .withDescription("use given configuration file By "
+ + "default looks for a file named "
+ + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+ .withLongOpt("config")
+ .create("c");
+
+ _config.setOption(configFile);
+
+ _config.processCommandline(args);
+ }
+
+ /**
+ * Simple ShutdownHook to cleanly shutdown the VirtualHosts on JVM shut down
+ */
+ protected class ShutdownHook implements Runnable
+ {
+ BDBStoreBindingsWorkaround _tool;
+
+ ShutdownHook(BDBStoreBindingsWorkaround bindingsTool)
+ {
+ _tool = bindingsTool;
+ }
+
+ public void run()
+ {
+ _tool.shutdown();
+ }
+ }
+
+ protected void shutdown()
+ {
+ if (_initialised)
+ {
+ ApplicationRegistry.remove(1);
+ }
+ }
+
+ protected void start()
+ {
+ _initialised = false;
+
+ _logger.info("BDBStore BindingsWorkaround process commencing");
+
+ loadVirtualHosts();
+
+ if (!_initialised)
+ {
+ System.exit(1);
+ }
+
+ addDurableExchangesToStoreIfRequired();
+ _logger.info("Workaround process complete");
+ }
+
+ private void loadVirtualHosts()
+ {
+ final File configFile = _config.getConfigFile();
+
+ if (!configFile.exists())
+ {
+ _logger.error("Config file not found:" + configFile.getAbsolutePath());
+ _logger.error("Options: [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+ return;
+ }
+ else
+ {
+ _logger.info("Using config file :" + configFile.getAbsolutePath());
+ }
+
+ try
+ {
+ _logger.info("Starting the VirtualHosts");
+
+ ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+
+ disableManagementStartup(registry.getConfiguration());
+
+ ApplicationRegistry.remove(1);
+ ApplicationRegistry.initialise(registry);
+
+ checkMessageStores();
+ _initialised = true;
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.error("Unable to load configuration due to configuration error: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to load configuration due to: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private void disableManagementStartup(ServerConfiguration config)
+ {
+ /*
+ * Update the server config to indicate management is disabled, which
+ * should prompt usage of the NoopManagedObjectRegistry instead of
+ * the JMXManagedObjectRegistry.
+ */
+ config.setManagementEnabled(false);
+
+ /*
+ * Set the com.sun.management.jmxremote property (to any value)
+ * if it isnt already set.
+ *
+ * The JMXManagedObjectRegistry checks for this system property before
+ * starting its own ConnectorServer programatically as this (usually)
+ * implies that the JVM was started explicitly with properties to enable
+ * its out-of-the-box JMX agent, to which JMXManagedObjectRegistry defers.
+ */
+ if(System.getProperty("com.sun.management.jmxremote") == null)
+ {
+ System.setProperty("com.sun.management.jmxremote", "8999");
+ }
+ }
+
+ private void checkMessageStores()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ for (VirtualHost vhost : vhosts)
+ {
+ if(! (vhost.getMessageStore() instanceof BDBMessageStore) )
+ {
+ _logger.warn("Virtualhost '" + vhost.getName() + "' is not using a BDBMessageStore. "
+ + "No changes will be made to it.");
+ }
+ }
+ }
+
+ private static List<AMQShortString> getBDBStoreExchangeNames(final VirtualHost vhost, final BDBMessageStore store)
+ {
+ final List<AMQShortString> exchanges = new ArrayList<AMQShortString>();
+
+ //Create a visitor to visit the Exchange entries and gather the names
+ DatabaseVisitor exchangeVisitor = new DatabaseVisitor()
+ {
+ @SuppressWarnings("unchecked")
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _logger.debug("Visiting new Exchange store entry");
+
+ TupleBinding binding = new ExchangeTB(vhost);
+ Exchange exchange = (Exchange) binding.entryToObject(value);
+
+ if(exchange != null)
+ {
+ AMQShortString name = exchange.getName();
+ if(name != null)
+ {
+ _logger.debug("Visited store entry for Exchange: " + name);
+ exchanges.add(name);
+ }
+ }
+ }
+ };
+
+ try
+ {
+ store.visitExchanges(exchangeVisitor);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error retrieving exiting Exchange names from the store: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ return exchanges;
+ }
+
+ private static void addDurableExchangesToStoreIfRequired()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ //For each active VHost, add all durable exchanges in the ExchangeRegistry to the store if they arent already in it.
+ for(VirtualHost vhost : vhosts)
+ {
+ _logger.info("Beginning process for VirtualHost: " + vhost.getName());
+
+ BDBMessageStore store;
+
+ if(!(vhost.getMessageStore() instanceof BDBMessageStore) )
+ {
+ _logger.info("Store is not a BDBMessageStore, skipping");
+ continue;
+ }
+ else
+ {
+ store = (BDBMessageStore) vhost.getMessageStore();
+ }
+
+ List<AMQShortString> bdbExchangeNames = getBDBStoreExchangeNames(vhost, store);
+
+ ExchangeRegistry registry = vhost.getExchangeRegistry();
+ Collection<AMQShortString> exchangeNames = registry.getExchangeNames();
+
+ for(AMQShortString exchangeName : exchangeNames)
+ {
+ Exchange exchange = registry.getExchange(exchangeName);
+ if(exchange.isDurable())
+ {
+ if(bdbExchangeNames.contains(exchangeName))
+ {
+ _logger.info("Store already contains entry for Exchange: " + exchangeName);
+ }
+ else
+ {
+ try
+ {
+ _logger.info("Adding store entry for Exchange: " + exchangeName);
+ store.createExchange(exchange);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error adding entry to store for Exchange '" + exchangeName + "':" + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
14 years, 11 months
rhmessaging commits: r3776 - in mgmt/trunk: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-11 09:28:55 -0500 (Mon, 11 Jan 2010)
New Revision: 3776
Removed:
mgmt/trunk/mint/python/mint/poll.py
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/main.py
Log:
Remove the now unused polling facility
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 22:35:57 UTC (rev 3775)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-11 14:28:55 UTC (rev 3776)
@@ -26,7 +26,6 @@
self.mint = Mint(config)
self.mint.updateEnabled = False
- self.mint.pollEnabled = False
self.mint.expireEnabled = False
self.mint.vacuumEnabled = False
Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py 2010-01-08 22:35:57 UTC (rev 3775)
+++ mgmt/trunk/mint/python/mint/main.py 2010-01-11 14:28:55 UTC (rev 3776)
@@ -9,7 +9,6 @@
from database import MintDatabase
from expire import ExpireThread
from model import MintModel
-from poll import PollThread
from update import UpdateThread
from vacuum import VacuumThread
@@ -26,9 +25,6 @@
self.updateEnabled = True
self.updateThread = UpdateThread(self)
- self.pollEnabled = False
- self.pollThread = PollThread(self)
-
self.expireEnabled = True
self.expireFrequency = self.config.expire_frequency
self.expireThreshold = self.config.expire_threshold
@@ -49,11 +45,9 @@
return cond and "enabled" or "disabled"
log.info("Updates are %s", state(self.updateEnabled))
- log.info("Polling is %s", state(self.pollEnabled))
log.info("Expiration is %s", state(self.expireEnabled))
self.updateThread.init()
- self.pollThread.init()
self.expireThread.init()
self.vacuumThread.init()
@@ -63,9 +57,6 @@
if self.updateEnabled:
self.updateThread.start()
- if self.pollEnabled:
- self.pollThread.start()
-
if self.expireEnabled:
self.expireThread.start()
@@ -78,9 +69,6 @@
if self.updateEnabled:
self.updateThread.stop()
- if self.pollEnabled:
- self.pollThread.stop()
-
if self.expireEnabled:
self.expireThread.stop()
Deleted: mgmt/trunk/mint/python/mint/poll.py
===================================================================
--- mgmt/trunk/mint/python/mint/poll.py 2010-01-08 22:35:57 UTC (rev 3775)
+++ mgmt/trunk/mint/python/mint/poll.py 2010-01-11 14:28:55 UTC (rev 3776)
@@ -1,31 +0,0 @@
-import logging
-from time import sleep
-from psycopg2 import OperationalError
-
-from util import MintDaemonThread
-
-log = logging.getLogger("mint.poll")
-
-class PollThread(MintDaemonThread):
- interval = 5
-
- def run(self):
- log.info("Polling for changes every %i seconds", self.interval)
-
- while True:
- try:
- if self.stopRequested:
- break
-
- self.do_run()
- except OperationalError, e:
- log.exception(e)
- if str(e).find("server closed the connection unexpectedly") >= 0:
- sys.exit() # XXX This seems like the wrong thing to do
- except Exception, e:
- log.exception(e)
-
- sleep(self.interval)
-
- def do_run(self):
- pass
14 years, 11 months
rhmessaging commits: r3775 - in mgmt/trunk: mint/python/mint and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 17:35:57 -0500 (Fri, 08 Jan 2010)
New Revision: 3775
Added:
mgmt/trunk/mint/python/mint/vacuum.py
Modified:
mgmt/trunk/cumin/python/cumin/model.py
mgmt/trunk/mint/python/mint/main.py
Log:
Do periodic vacuuming in the update queue
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 22:33:13 UTC (rev 3774)
+++ mgmt/trunk/cumin/python/cumin/model.py 2010-01-08 22:35:57 UTC (rev 3775)
@@ -28,6 +28,7 @@
self.mint.updateEnabled = False
self.mint.pollEnabled = False
self.mint.expireEnabled = False
+ self.mint.vacuumEnabled = False
self.lock = Lock()
Modified: mgmt/trunk/mint/python/mint/main.py
===================================================================
--- mgmt/trunk/mint/python/mint/main.py 2010-01-08 22:33:13 UTC (rev 3774)
+++ mgmt/trunk/mint/python/mint/main.py 2010-01-08 22:35:57 UTC (rev 3775)
@@ -7,10 +7,11 @@
from parsley.threadingex import Lifecycle
from database import MintDatabase
+from expire import ExpireThread
from model import MintModel
+from poll import PollThread
from update import UpdateThread
-from poll import PollThread
-from expire import ExpireThread
+from vacuum import VacuumThread
log = logging.getLogger("mint.main")
@@ -33,6 +34,9 @@
self.expireThreshold = self.config.expire_threshold
self.expireThread = ExpireThread(self)
+ self.vacuumEnabled = True
+ self.vacuumThread = VacuumThread(self)
+
def check(self):
self.database.check()
self.model.check()
@@ -51,6 +55,7 @@
self.updateThread.init()
self.pollThread.init()
self.expireThread.init()
+ self.vacuumThread.init()
def do_start(self):
self.model.start()
@@ -64,6 +69,9 @@
if self.expireEnabled:
self.expireThread.start()
+ if self.vacuumEnabled:
+ self.vacuumThread.start()
+
def do_stop(self):
self.model.stop()
@@ -76,6 +84,9 @@
if self.expireEnabled:
self.expireThread.stop()
+ if self.vacuumEnabled:
+ self.vacuumThread.stop()
+
class MintConfig(Config):
def __init__(self):
super(MintConfig, self).__init__()
Added: mgmt/trunk/mint/python/mint/vacuum.py
===================================================================
--- mgmt/trunk/mint/python/mint/vacuum.py (rev 0)
+++ mgmt/trunk/mint/python/mint/vacuum.py 2010-01-08 22:35:57 UTC (rev 3775)
@@ -0,0 +1,35 @@
+from update import *
+from util import *
+
+log = logging.getLogger("mint.vacuum")
+
+class VacuumThread(MintDaemonThread):
+ def __init__(self, app):
+ super(VacuumThread, self).__init__(app)
+
+ def run(self):
+ while True:
+ if self.stopRequested:
+ break
+
+ up = VacuumUpdate(self.app.model)
+ self.app.updateThread.enqueue(up)
+
+ sleep(60 * 10)
+
+class VacuumUpdate(ModelUpdate):
+ def __init__(self, model):
+ super(VacuumUpdate, self).__init__(model, None, None)
+
+ def process(self, thread):
+ log.info("Vacuuming")
+
+ thread.conn.commit()
+
+ level = thread.conn.isolation_level
+ thread.conn.set_isolation_level(0)
+
+ cursor = thread.cursor()
+ cursor.execute("vacuum")
+
+ thread.conn.set_isolation_level(level)
14 years, 11 months
rhmessaging commits: r3774 - mgmt/trunk/cumin/python/cumin/grid.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2010-01-08 17:33:13 -0500 (Fri, 08 Jan 2010)
New Revision: 3774
Modified:
mgmt/trunk/cumin/python/cumin/grid/submission.py
Log:
submitter_name is gone
Modified: mgmt/trunk/cumin/python/cumin/grid/submission.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/grid/submission.py 2010-01-08 18:54:29 UTC (rev 3773)
+++ mgmt/trunk/cumin/python/cumin/grid/submission.py 2010-01-08 22:33:13 UTC (rev 3774)
@@ -70,7 +70,7 @@
def render_content(self, session, data):
href = self.parent.get_submitter_href \
(session, data["scheduler_id"])
- return fmt_link(href, data["submitter_name"])
+ return fmt_link(href, data["owner"])
class IdleColumn(SqlTableColumn):
def render_title(self, session, data):
14 years, 11 months