[rhmessaging-commits] rhmessaging commits: r3925 - in mgmt/newdata: mint and 2 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Fri Apr 23 14:36:38 EDT 2010
Author: justi9
Date: 2010-04-23 14:36:37 -0400 (Fri, 23 Apr 2010)
New Revision: 3925
Removed:
mgmt/newdata/mint/python/mint/update.py
Modified:
mgmt/newdata/cumin/python/cumin/objectframe.py
mgmt/newdata/mint/Makefile
mgmt/newdata/mint/python/mint/database.py
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/main.py
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/newupdate.py
mgmt/newdata/mint/python/mint/tools.py
mgmt/newdata/mint/python/mint/util.py
mgmt/newdata/mint/python/mint/vacuum.py
mgmt/newdata/rosemary/python/rosemary/model.py
Log:
* Remove old update framework
* Make --log-level also affect --debug mode in mint tools
* Adapt to v1.1 qmf console changes; a problem with oid reference
values remains
* Partial adaptation of mint-admin functions away from sqlobject
* Fix mint-bench reporting
* Drop ObjectId and AgentId adapter classes
* Remove unused schema-related makefile rules from mint
* Tweak context-path separator styling
Modified: mgmt/newdata/cumin/python/cumin/objectframe.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectframe.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/cumin/python/cumin/objectframe.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -190,7 +190,7 @@
links.append(self.link.render(session, frame))
- return " > ".join(reversed(links))
+ return " › ".join(reversed(links))
class ObjectViewContextLink(Link):
def __init__(self, app, name):
Modified: mgmt/newdata/mint/Makefile
===================================================================
--- mgmt/newdata/mint/Makefile 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/Makefile 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,4 +1,4 @@
-.PHONY: build install clean schema schema-clean
+.PHONY: build install clean
include ../etc/Makefile.common
@@ -26,14 +26,4 @@
install -d ${etc}
install -pm 0644 etc/mint-vacuumdb.cron ${etc}
-schema: schema-clean
- $(MAKE) schema -C xml
- $(MAKE) schema -C python/mint
- $(MAKE) schema -C sql
-
-schema-clean:
- $(MAKE) clean -C xml
- $(MAKE) clean -C python/mint
- $(MAKE) clean -C sql
-
clean: clean-python-files
Modified: mgmt/newdata/mint/python/mint/database.py
===================================================================
--- mgmt/newdata/mint/python/mint/database.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/database.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,10 +1,8 @@
-import logging
-import os.path
-
from psycopg2 import ProgrammingError
from sqlobject import connectionForURI, sqlhub
from model import MintInfo, Role
+from util import *
log = logging.getLogger("mint.database")
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,7 +1,6 @@
from newupdate import *
from schema import *
from sql import *
-from update import *
from util import *
import mint
@@ -43,12 +42,9 @@
if self.stop_requested:
break
- up = ExpireUpdate()
+ up = ExpireUpdate(self.app.model)
self.app.update_thread.enqueue(up)
- up = NewExpireUpdate(self.app.model)
- self.app.new_update_thread.enqueue(up)
-
sleep(frequency)
def __convertTimeUnits(self, t):
@@ -68,28 +64,6 @@
class ExpireUpdate(Update):
def do_process(self, conn, stats):
- attrs = self.thread.app.expire_thread.attrs
-
- cursor = conn.cursor()
- total = 0
-
- for op in self.thread.app.expire_thread.ops:
- log.debug("Running expire op %s", op)
-
- count = op.execute(cursor, attrs)
-
- conn.commit()
-
- log.debug("%i records expired", count)
-
- total += count
-
- log.debug("%i total records expired", total)
-
- stats.expired += 1
-
-class NewExpireUpdate(NewUpdate):
- def do_process(self, conn, stats):
seconds = self.model.app.expire_threshold
log.info("Expiring samples older than %i seconds", seconds)
Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/main.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,8 +1,7 @@
from database import MintDatabase
from expire import ExpireThread
from model import MintModel
-from newupdate import NewUpdateThread
-from update import UpdateThread
+from newupdate import UpdateThread
from vacuum import VacuumThread
from util import *
@@ -19,7 +18,6 @@
self.update_enabled = True
self.update_thread = UpdateThread(self)
- self.new_update_thread = NewUpdateThread(self)
self.expire_enabled = True
self.expire_frequency = self.config.expire_frequency
@@ -44,7 +42,6 @@
log.info("Expiration is %s", state(self.expire_enabled))
self.update_thread.init()
- self.new_update_thread.init()
self.expire_thread.init()
self.vacuum_thread.init()
@@ -52,8 +49,7 @@
self.model.start()
if self.update_enabled:
- # XXX self.update_thread.start()
- self.new_update_thread.start()
+ self.update_thread.start()
if self.expire_enabled:
self.expire_thread.start()
@@ -66,7 +62,6 @@
if self.update_enabled:
self.update_thread.stop()
- self.new_update_thread.stop()
if self.expire_enabled:
self.expire_thread.stop()
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/model.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -5,7 +5,6 @@
from newupdate import *
from schema import *
from schemalocal import *
-from update import *
from util import *
import mint.schema
@@ -23,7 +22,7 @@
mint.schema.model = self
self.rosemary = RosemaryModel()
- self.rosemary.sql_logging_enabled = True
+ self.rosemary.sql_logging_enabled = False
self.qmf_session = None
self.qmf_brokers = list()
@@ -54,8 +53,9 @@
# have left behind in the DB; it's basically an unconstrained
# agent disconnect update, for any agent
- up = AgentDisconnectUpdate(None)
- self.app.update_thread.enqueue(up)
+ # XXX
+ #up = AgentDisconnectUpdate(None)
+ #self.app.update_thread.enqueue(up)
uris = [x.strip() for x in self.app.config.qmf.split(",")]
@@ -63,25 +63,34 @@
self.add_broker(uri)
def do_stop(self):
- for qbroker in self.qmf_brokers:
- self.qmf_session.delBroker(qbroker)
+ for qmf_broker in self.qmf_brokers:
+ self.qmf_session.delBroker(qmf_broker)
def add_broker(self, url):
log.info("Adding qmf broker at %s", url)
self.lock.acquire()
try:
- qbroker = self.qmf_session.addBroker(url)
- self.qmf_brokers.append(qbroker)
+ qmf_broker = self.qmf_session.addBroker(url)
+ self.qmf_brokers.append(qmf_broker)
finally:
self.lock.release()
+ def get_agent(self, qmf_agent):
+ id = qmf_agent.getAgentBank()
+
+ self.lock.acquire()
+ try:
+ return self.agents[id]
+ finally:
+ self.lock.release()
+
class MintAgent(object):
def __init__(self, model, qmf_agent):
self.model = model
self.qmf_agent = qmf_agent
- self.id = str(QmfAgentId.fromAgent(self.qmf_agent))
+ self.id = qmf_agent.getAgentBank()
self.last_heartbeat = None
@@ -105,8 +114,12 @@
assert isinstance(obj, RosemaryObject)
class_key = ClassKey(obj._qmf_class_key)
- oid = QmfObjectId.fromString(obj._qmf_object_id).toObjectId()
+ oid_args = {"_agent_name": obj._qmf_agent_id,
+ "_object_name": obj._qmf_object_id}
+
+ oid = ObjectId(oid_args)
+
self.model.lock.acquire()
try:
broker = self.qmf_agent.getBroker()
@@ -137,63 +150,41 @@
def __init__(self, model):
self.model = model
- self.deferred_object_prop_calls = defaultdict(list)
- self.deferred_object_stat_calls = defaultdict(list)
+ def brokerConnected(self, qmf_broker):
+ log.info("Broker at %s:%i is connected",
+ qmf_broker.host, qmf_broker.port)
- def brokerConnected(self, qbroker):
- log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+ def brokerInfo(self, qmf_broker):
+ log.info("Broker info from %s", qmf_broker)
- def brokerInfo(self, qbroker):
- # Now we have a brokerId to use to generate fully qualified agent
- # IDs
+ def brokerDisconnected(self, qmf_broker):
+ log.info("Broker at %s:%i is disconnected",
+ qmf_broker.host, qmf_broker.port)
- for qagent in qbroker.getAgents():
- MintAgent(self.model, qagent)
+ def newAgent(self, qmf_agent):
+ log.info("Creating %s", qmf_agent)
- def brokerDisconnected(self, qbroker):
- log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+ MintAgent(self.model, qmf_agent)
- def newAgent(self, qagent):
- log.info("Creating %s", qagent)
+ def delAgent(self, qmf_agent):
+ log.info("Deleting %s", qmf_agent)
- # Some agents come down without a brokerId, meaning we can't
- # generate a fully qualified agent ID for them. Those we
- # handle in brokerInfo.
+ try:
+ agent = self.model.get_agent(qmf_agent)
+ except KeyError:
+ return
- if qagent.getBroker().brokerId:
- agent = MintAgent(self.model, qagent)
-
- # XXX This business is to handle an agent-vs-agent-data ordering
- # problem
-
- objectPropCalls = self.deferred_object_prop_calls[agent.id]
-
- for broker, object in objectPropCalls:
- self.objectProps(broker, object)
-
- objectStatCalls = self.deferred_object_stat_calls[agent.id]
-
- for broker, object in objectStatCalls:
- self.objectStats(broker, object)
-
- def delAgent(self, qagent):
- log.info("Deleting %s", qagent)
-
- id = str(QmfAgentId.fromAgent(qagent))
-
- agent = self.model.agents[id]
agent.delete()
- up = AgentDisconnectUpdate(agent)
- self.model.app.update_thread.enqueue(up)
+ if self.model.app.update_thread.isAlive():
+ up = AgentDelete(self.model, agent)
+ self.model.app.update_thread.enqueue(up)
- def heartbeat(self, qagent, timestamp):
+ def heartbeat(self, qmf_agent, timestamp):
timestamp = timestamp / 1000000000
- id = str(QmfAgentId.fromAgent(qagent))
-
try:
- agent = self.model.agents[id]
+ agent = self.model.get_agent(qmf_agent)
except KeyError:
return
@@ -208,58 +199,26 @@
# XXX I want to store class keys using this, but I can't,
# because I don't get any agent info; instead
- def objectProps(self, broker, object):
- self.model.lock.acquire()
- try:
- pass
- finally:
- self.model.lock.release()
+ def objectProps(self, broker, obj):
+ agent = self.model.get_agent(obj._agent)
- self.model.lock.acquire()
- try:
- id = str(QmfAgentId.fromObject(object))
-
- try:
- agent = self.model.agents[id]
- except KeyError:
- self.deferred_object_prop_calls[id].append((broker, object))
- return
- finally:
- self.model.lock.release()
-
if self.model.app.update_thread.isAlive():
- up = PropertyUpdate(agent, object)
- self.model.app.update_thread.enqueue(up)
-
- if self.model.app.new_update_thread.isAlive():
- if object.getTimestamps()[2]:
- up = NewObjectDelete(self.model, agent, object)
+ if obj.getTimestamps()[2]:
+ up = ObjectDelete(self.model, agent, obj)
else:
- up = NewObjectUpdate(self.model, agent, object)
+ up = ObjectUpdate(self.model, agent, obj)
- self.model.app.new_update_thread.enqueue(up)
+ self.model.app.update_thread.enqueue(up)
- def objectStats(self, broker, object):
- self.model.lock.acquire()
- try:
- id = str(QmfAgentId.fromObject(object))
+ def objectStats(self, broker, obj):
+ print "objectStats!", broker, obj
- try:
- agent = self.model.agents[id]
- except KeyError:
- self.deferred_object_stat_calls[id].append((broker, object))
- return
- finally:
- self.model.lock.release()
+ agent = self.get_agent(obj._agent)
if self.model.app.update_thread.isAlive():
- up = StatisticUpdate(agent, object)
+ up = ObjectAddSample(self.model, agent, obj)
self.model.app.update_thread.enqueue(up)
- if self.model.app.new_update_thread.isAlive():
- up = NewSampleUpdate(self.model, agent, object)
- self.model.app.new_update_thread.enqueue(up)
-
def event(self, broker, event):
""" Invoked when an event is raised. """
pass
Modified: mgmt/newdata/mint/python/mint/newupdate.py
===================================================================
--- mgmt/newdata/mint/python/mint/newupdate.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/newupdate.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -6,9 +6,9 @@
log = logging.getLogger("mint.newupdate")
-class NewUpdateThread(MintDaemonThread):
+class UpdateThread(MintDaemonThread):
def __init__(self, app):
- super(NewUpdateThread, self).__init__(app)
+ super(UpdateThread, self).__init__(app)
self.conn = None
self.stats = None
@@ -19,7 +19,7 @@
def init(self):
self.conn = self.app.database.get_connection()
- self.stats = NewUpdateStats()
+ self.stats = UpdateStats()
def enqueue(self, update):
update.thread = self
@@ -50,7 +50,7 @@
update.process(self.conn, self.stats)
-class NewUpdateStats(object):
+class UpdateStats(object):
def __init__(self):
self.enqueued = 0
self.dequeued = 0
@@ -63,7 +63,7 @@
self.samples_expired = 0
self.samples_dropped = 0
-class NewUpdate(object):
+class Update(object):
def __init__(self, model):
self.model = model
@@ -83,7 +83,7 @@
conn.rollback()
- if self.model.app.new_update_thread.halt_on_error:
+ if self.model.app.update_thread.halt_on_error:
raise
def do_process(self, conn, stats):
@@ -92,58 +92,20 @@
def __repr__(self):
return self.__class__.__name__
-class NewObjectUpdate(NewUpdate):
- def __init__(self, model, agent, object):
- super(NewObjectUpdate, self).__init__(model)
+class ObjectUpdate(Update):
+ def __init__(self, model, agent, obj):
+ super(ObjectUpdate, self).__init__(model)
self.agent = agent
- self.object = object
- self.object_id = str(QmfObjectId.fromObject(object))
- self.session_id = None
+ self.object = obj
- sequence = object.getObjectId().getSequence()
-
- if sequence != 0:
- self.session_id = str(sequence)
-
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- self.update_time = datetime.fromtimestamp(update_time / 1000000000)
- self.create_time = datetime.fromtimestamp(create_time / 1000000000)
- self.delete_time = datetime.fromtimestamp(delete_time / 1000000000)
-
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
columns = list()
- update_time, create_time, delete_time = self.object.getTimestamps()
-
- if obj._sync_time:
- # This object is already in the database
-
- obj._qmf_update_time = self.update_time
- columns.append(cls.sql_table._qmf_update_time)
-
- # XXX session_id may have changed too?
- else:
- obj._qmf_agent_id = self.agent.id
- obj._qmf_object_id = self.object_id
- obj._qmf_session_id = self.session_id
- obj._qmf_class_key = str(self.object.getClassKey())
- obj._qmf_update_time = self.update_time
- obj._qmf_create_time = self.create_time
-
- columns.append(cls.sql_table._id)
- columns.append(cls.sql_table._qmf_agent_id)
- columns.append(cls.sql_table._qmf_object_id)
- columns.append(cls.sql_table._qmf_session_id)
- columns.append(cls.sql_table._qmf_class_key)
- columns.append(cls.sql_table._qmf_update_time)
- columns.append(cls.sql_table._qmf_create_time)
-
- self.process_references(obj, columns)
+ self.process_qmf_attributes(obj, columns)
self.process_properties(obj, columns)
cursor = conn.cursor()
@@ -166,7 +128,7 @@
raise PackageUnknown(name)
name = class_key.getClassName()
- name = name[0].upper() + name[1:]
+ name = name[0].upper() + name[1:] # /me shakes fist
try:
cls = pkg._classes_by_name[name]
@@ -198,63 +160,100 @@
return obj
- def process_references(self, obj, columns):
- for prop, value in self.object.getProperties():
- if prop.type != 10:
- continue
+ def process_qmf_attributes(self, obj, columns):
+ table = obj._class.sql_table
- try:
- ref = obj._class._references_by_name[prop.name]
- except KeyError:
- log.debug("Reference %s is unknown", prop.name)
+ update_time, create_time, delete_time = self.object.getTimestamps()
- continue
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+ create_time = datetime.fromtimestamp(create_time / 1000000000)
- if not ref.sql_column:
- log.warn("Reference %s has no column; skipping it", ref.name)
+ if delete_time:
+ delete_time = datetime.fromtimestamp(delete_time / 1000000000)
- continue
+ if obj._sync_time:
+ # This object is already in the database
- col = ref.sql_column
+ obj._qmf_update_time = update_time
+ columns.append(table._qmf_update_time)
- if value:
- that_id = str(QmfObjectId(value.first, value.second))
- that = self.get_object(ref.that_cls, that_id)
+ # XXX session_id may have changed too?
+ else:
+ obj._qmf_agent_id = self.agent.id
+ obj._qmf_object_id = self.object.getName()
+ obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_class_key = str(self.object.getClassKey())
+ obj._qmf_update_time = update_time
+ obj._qmf_create_time = create_time
- if not that._sync_time:
- continue
+ columns.append(table._id)
+ columns.append(table._qmf_agent_id)
+ columns.append(table._qmf_object_id)
+ columns.append(table._qmf_session_id)
+ columns.append(table._qmf_class_key)
+ columns.append(table._qmf_update_time)
+ columns.append(table._qmf_create_time)
- value = that._id
+ def process_properties(self, obj, columns):
+ cls = obj._class
- columns.append(col)
-
- setattr(obj, col.name, value)
-
- def process_properties(self, obj, columns):
for prop, value in self.object.getProperties():
- if prop.type == 10:
- continue
-
try:
- col = obj._class._properties_by_name[prop.name].sql_column
- except KeyError:
- log.debug("Property %s is unknown", prop)
-
+ if prop.type == 10:
+ col, nvalue = self.process_reference(cls, prop, value)
+ else:
+ col, nvalue = self.process_value(cls, prop, value)
+ except MappingException, e:
+ log.debug(e)
continue
- if value is not None:
- value = self.transform_value(prop, value)
-
# XXX This optimization will be obsolete when QMF does it
# instead
- if value == getattr(obj, col.name):
+ if nvalue == getattr(obj, col.name):
continue
+ setattr(obj, col.name, nvalue)
columns.append(col)
- setattr(obj, col.name, value)
+ def process_reference(self, cls, prop, value):
+ try:
+ ref = cls._references_by_name[prop.name]
+ except KeyError:
+ raise MappingException("Reference %s is unknown" % prop.name)
+ if not ref.sql_column:
+ raise MappingException("Reference %s has no column" % ref.name)
+
+ col = ref.sql_column
+
+ if value:
+ try:
+ that_id = str(value.objectName)
+ except:
+ raise MappingException("XXX ref isn't an oid")
+
+ that = self.get_object(ref.that_cls, that_id)
+
+ if not that._sync_time:
+ msg = "Referenced object %s hasn't appeared yet"
+ raise MappingException(msg % that)
+
+ value = that._id
+
+ return col, value
+
+ def process_value(self, cls, prop, value):
+ try:
+ col = cls._properties_by_name[prop.name].sql_column
+ except KeyError:
+ raise MappingException("Property %s is unknown" % prop)
+
+ if value is not None:
+ value = self.transform_value(prop, value)
+
+ return col, value
+
def transform_value(self, attr, value):
if attr.type == 8: # absTime
if value == 0:
@@ -274,13 +273,14 @@
def __repr__(self):
name = self.__class__.__name__
cls = self.object.getClassKey().getClassName()
+ id = self.object.getName()
- return "%s(%s,%s,%s)" % (name, self.agent.id, cls, self.object_id)
+ return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-class NewObjectDelete(NewObjectUpdate):
+class ObjectDelete(ObjectUpdate):
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
cursor = conn.cursor()
@@ -290,16 +290,16 @@
cursor.close()
try:
- del self.agent.objects_by_id[self.object_id]
+ del self.agent.objects_by_id[self.object.getName()]
except KeyError:
pass
stats.deleted += 1
-class NewSampleUpdate(NewObjectUpdate):
+class ObjectAddSample(ObjectUpdate):
def do_process(self, conn, stats):
cls = self.get_class()
- obj = self.get_object(cls, self.object_id)
+ obj = self.get_object(cls, self.object.getName())
if not cls._statistics:
stats.samples_dropped += 1; return
@@ -311,13 +311,17 @@
if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
stats.samples_dropped += 1; return
+ update_time, create_time, delete_time = self.object.getTimestamps()
+
+ update_time = datetime.fromtimestamp(update_time / 1000000000)
+
update_columns = list()
update_columns.append(cls.sql_table._qmf_update_time)
insert_columns = list()
insert_columns.append(cls.sql_samples_table._qmf_update_time)
- obj._qmf_update_time = self.update_time
+ obj._qmf_update_time = update_time
self.process_samples(obj, update_columns, insert_columns)
@@ -345,6 +349,8 @@
if value is not None:
value = self.transform_value(stat, value)
+ # Don't write unchanged values
+ #
# XXX This optimization will be obsolete when QMF does it
# instead
@@ -355,6 +361,28 @@
setattr(obj, col.name, value)
+class AgentDelete(Update):
+ def __init__(self, model, agent):
+ super(AgentDelete, self).__init__(model)
+
+ self.agent = agent
+
+ def do_process(self, conn, stats):
+ print "Ahoy!"
+
+ cursor = conn.cursor()
+
+ id = self.agent.id
+
+ try:
+ for pkg in self.model.rosemary._packages:
+ for cls in pkg._classes:
+ for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ obj.delete()
+ print "Bam!", obj
+ finally:
+ cursor.close()
+
class UpdateException(Exception):
def __init__(self, name):
self.name = name
@@ -370,3 +398,6 @@
class ObjectUnknown(UpdateException):
pass
+
+class MappingException(Exception):
+ pass
Modified: mgmt/newdata/mint/python/mint/tools.py
===================================================================
--- mgmt/newdata/mint/python/mint/tools.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/tools.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -72,9 +72,12 @@
if self.config.debug:
self.config.prt()
- enable_logging("rosemary", "debug", sys.stderr)
- enable_logging("mint", "debug", sys.stderr)
+ level = getattr(self.config, "log_level", "debug")
+
+ enable_logging("rosemary", level, sys.stderr)
+ enable_logging("mint", level, sys.stderr)
+
self.do_run(opts, args)
def do_run(self, opts, args):
@@ -87,7 +90,7 @@
class DatabaseSubcommand(Command):
def run(self, opts, args):
- conn = self.parent.database.get_connection()
+ conn = self.parent.app.database.get_connection()
cursor = conn.cursor()
try:
@@ -153,6 +156,8 @@
command.description = "Change password of USER"
command.arguments = ("USER",)
+ self.app = None
+
def run(self):
try:
opts, remaining = self.parse_options(sys.argv[1:])
@@ -171,16 +176,13 @@
self.config.prt()
enable_logging("mint", "debug", sys.stderr)
- app = Mint(self.config)
- app.update_enabled = False
- app.expire_enabled = False
+ self.app = Mint(self.config)
+ self.app.update_enabled = False
+ self.app.expire_enabled = False
- #self.cumin = app.model.rosemary.com_redhat_cumin
+ self.app.check()
+ self.app.init()
- self.database = MintDatabase(app)
- self.database.check()
- self.database.init()
-
try:
scommand = remaining[0]
except IndexError:
@@ -214,13 +216,13 @@
class LoadSchema(Command):
def run(self, opts, args):
- self.parent.database.load_schema()
+ self.parent.app.database.load_schema()
print "The schema is loaded"
class DropSchema(Command):
def run(self, opts, args):
if "force" in opts:
- self.parent.database.drop_schema()
+ self.parent.app.database.drop_schema()
print "The schema is dropped"
else:
raise CommandException \
@@ -229,8 +231,8 @@
class ReloadSchema(Command):
def run(self, opts, args):
if "force" in opts:
- self.parent.database.drop_schema()
- self.parent.database.load_schema()
+ self.parent.app.database.drop_schema()
+ self.parent.app.database.load_schema()
print "The schema is reloaded"
else:
raise CommandException \
@@ -238,7 +240,7 @@
class CheckSchema(Command):
def run(self, opts, args):
- self.parent.database.check_schema()
+ self.parent.app.database.check_schema()
class AddUser(DatabaseSubcommand):
def do_run(self, cursor, opts, args):
@@ -247,13 +249,6 @@
except IndexError:
raise CommandException(self, "NAME is required")
- #objs = self.parent.cumin.User.get_selection(cursor, name=name)
- #if objs:
-
- if Subject.selectBy(name=name).count():
- print "Error: a user called '%s' already exists" % name
- sys.exit(1)
-
try:
password = args[2]
except IndexError:
@@ -261,62 +256,61 @@
crypted = crypt_password(password)
- try:
- subject = Subject(name=name, password=crypted)
+ pkg = self.parent.app.model.rosemary.com_redhat_cumin
- for role in Role.selectBy(name="user"):
- subject.addRole(role)
+ for role in pkg.Role.get_selection(cursor, name="user"):
+ break
- assert role
+ assert role, self
- subject.syncUpdate()
+ user = pkg.User.create_object(cursor)
+ user.name = name
+ user.password = crypted
- # user = self.parent.cumin.User.create_object(cursor)
- # user.name = name
- # user.password = crypted
- # user.save(cursor)
-
- # roles = self.parent.cumin.Role.get_selection \
- # (cursor, name="user")
-
- # for role in roles:
- # mapping = self.parent.cumin.UserRoleMapping.create_object \
- # (cursor)
- # mapping.user = user
- # mapping.role = role
- # mapping.save(cursor)
-
- # break
-
- assert role, self
+ try:
+ user.save(cursor)
except IntegrityError:
print "Error: a user called '%s' already exists" % name
sys.exit(1)
+ mapping = pkg.UserRoleMapping.create_object(cursor)
+ mapping._role_id = role._id
+ mapping._user_id = user._id
+ mapping.save(cursor)
+
+ conn.commit()
+
+ assert role, self
+
print "User '%s' is added" % name
- class RemoveUser(Command):
- def run(self, opts, args):
- if "force" in opts:
- if len(args) != 2:
- print "Error: no user name given"
- sys.exit(1)
+ class RemoveUser(DatabaseSubcommand):
+ def do_run(self, cursor, opts, args):
+ if "force" not in opts:
+ msg = "Command remove-user requires --force"
+ raise CommandException(self, msg)
+ try:
name = args[1]
- subjects = Subject.selectBy(name=name)
+ except IndexError:
+ raise CommandException(self, "NAME is required")
- if subjects.count():
- for subject in subjects:
- subject.destroySelf()
- break
- else:
- raise CommandException(self, "User '%s' is unknown" % name)
+ name = args[1]
- print "User '%s' is removed" % name
- else:
- raise CommandException \
- (self, "Command remove-user requires --force yes")
+ cls = self.app.model.rosemary.com_redhat_cumin.User
+ for user in cls.get_selection(cursor, name=name):
+ break
+
+ if not user:
+ raise CommandException(self, "User '%s' is unknown" % name)
+
+ user.delete(cursor)
+
+ conn.commit()
+
+ print "User '%s' is removed" % name
+
class ListUsers(Command):
def run(self, opts, args):
subjects = Subject.select(orderBy='name')
@@ -457,8 +451,34 @@
for arg in args[1:]:
app.model.add_broker(arg)
+ sleep(2)
+
+ cls = app.model.rosemary.org_apache_qpid_broker.Broker
+
+ conn = app.database.get_connection()
+ cursor = conn.cursor()
+
+ for obj in cls.get_selection(cursor):
+ try:
+ agent = app.model.agents[obj._qmf_agent_id]
+ except KeyError:
+ continue
+
+ break
+
+ print "TTT", obj.port, obj, agent
+
+ def completion(status_code, output_args):
+ print "YYY", status_code, output_args
+
+ agent.call_method(completion, obj, "echo", 1, "ggoo!")
+
while True:
sleep(2)
+ except Exception, e:
+ print_exc()
+
+ print e
finally:
app.stop()
@@ -492,6 +512,17 @@
print "Warning: Failed connecting to broker at '%s'" % arg
try:
+ enq = 0
+ deq = 0
+
+ upd = 0
+ dlt = 0
+ drp = 0
+
+ supd = 0
+ sexp = 0
+ sdrp = 0
+
enq_last = 0
deq_last = 0
@@ -513,7 +544,7 @@
sleep(1)
- stats = app.new_update_thread.stats
+ stats = app.update_thread.stats
enq = stats.enqueued
deq = stats.dequeued
@@ -522,9 +553,9 @@
dlt = stats.deleted
drp = stats.dropped
- supd = stats.stats_updated
- sexp = stats.stats_expired
- sdrp = stats.stats_dropped
+ supd = stats.samples_updated
+ sexp = stats.samples_expired
+ sdrp = stats.samples_dropped
print row % (enq - enq_last,
deq - deq_last,
@@ -561,5 +592,6 @@
finally:
#from threading import enumerate
#for item in enumerate():
+ # print item
app.stop()
Deleted: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/update.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,367 +0,0 @@
-import pickle
-import mint
-
-from collections import deque
-from qpid.datatypes import UUID
-
-from schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.update")
-
-class UpdateThread(MintDaemonThread):
- """
- Only the update thread writes to the database
- """
-
- def __init__(self, app):
- super(UpdateThread, self).__init__(app)
-
- self.conn = None
- self.stats = None
-
- self.updates = UpdateQueue(slotCount=2)
-
- def init(self):
- self.conn = self.app.database.get_connection()
- self.stats = UpdateStats()
-
- def enqueue(self, update):
- update.thread = self
-
- self.updates.put(update)
-
- self.stats.enqueued += 1
-
- # This is an attempt to yield from the enqueueing thread (this
- # method's caller) to the update thread
-
- if self.updates.qsize() > 1000:
- sleep(0.1)
-
- def run(self):
- while True:
- if self.stop_requested:
- break
-
- try:
- update = self.updates.get(True, 1)
- except Empty:
- continue
-
- self.stats.dequeued += 1
-
- update.process()
-
-class UpdateStats(object):
- def __init__(self):
- self.enqueued = 0
- self.dequeued = 0
- self.stat_updated = 0
- self.prop_updated = 0
- self.expired = 0
- self.dropped = 0
- self.deferred = 0
-
-class ReferenceException(Exception):
- def __init__(self, sought):
- self.sought = sought
-
- def __str__(self):
- return repr(self.sought)
-
-class Update(object):
- def __init__(self):
- self.thread = None
- self.priority = 0
-
- def process(self):
- log.debug("Processing %s", self)
-
- assert self.thread
-
- conn = self.thread.conn
- stats = self.thread.stats
-
- try:
- self.do_process(conn, stats)
-
- for notice in conn.notices:
- log.debug("Database: %s", notice)
-
- conn.commit()
- except:
- log.exception("Update failed")
-
- conn.rollback()
-
- def do_process(self, conn, stats):
- raise Exception("Not implemented")
-
- def __repr__(self):
- return "%s(%i)" % (self.__class__.__name__, self.priority)
-
-class ObjectUpdate(Update):
- def __init__(self, agent, object):
- super(ObjectUpdate, self).__init__()
-
- self.agent = agent
- self.object = object
-
- self.object_id = str(QmfObjectId.fromObject(object))
-
- def getClass(self):
- # XXX this is unfortunate
- name = self.object.getClassKey().getClassName()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
- name = name[0].upper() + name[1:]
-
- try:
- return schemaNameToClassMap[name]
- except KeyError:
- raise ReferenceException(name)
-
- def process_attributes(self, attrs, cls):
- results = dict()
-
- for key, value in attrs:
- name = key.__repr__()
- name = mint.schema.schemaReservedWordsMap.get(name, name)
-
- if key.type == 10:
- # XXX don't want oid around much
- self.process_reference(name, value, results)
- continue
-
- if not hasattr(cls, name):
- # Discard attrs that we don't have in our schema
- log.debug("%s has no field '%s'" % (cls, name))
- continue
-
- if key.type == 8:
- self.process_timestamp(name, value, results)
- continue
-
- if key.type == 14:
- # Convert UUIDs into their string representation, to be
- # handled by sqlobject
- results[name] = str(value)
- continue
-
- if key.type == 15:
- #if value:
- results[name] = pickle.dumps(value)
- continue
-
- results[name] = value
-
- return results
-
- # XXX this needs to be a much more straightforward procedure
- def process_reference(self, name, oid, results):
- if name.endswith("Ref"):
- name = name[:-3]
-
- className = name[0].upper() + name[1:]
-
- try:
- otherClass = getattr(mint, className)
- except AttributeError:
- return
-
- foreignKey = name + "_id"
-
- object_id = str(QmfObjectId(oid.first, oid.second))
- id = self.agent.database_ids.get(object_id)
-
- if id is None:
- # XXX don't want oid around much
- raise ReferenceException(oid)
-
- results[foreignKey] = id
-
- def process_timestamp(self, name, tstamp, results):
- if tstamp:
- t = datetime.fromtimestamp(tstamp / 1000000000)
- results[name] = t
-
- def __repr__(self):
- cls = self.object.getClassKey().getClassName()
-
- return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
- self.agent.id,
- cls,
- self.object_id,
- self.priority)
-
-class PropertyUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
-
- stats.dropped += 1
-
- return
-
- try:
- attrs = self.process_attributes(self.object.getProperties(), cls)
- except ReferenceException, e:
- log.info("Referenced object %r not found", e.sought)
-
- self.agent.deferred_updates[self.object_id].append(self)
-
- stats.deferred += 1
-
- return
-
- update, create, delete = self.object.getTimestamps()
-
- self.process_timestamp("qmfUpdateTime", update, attrs)
- self.process_timestamp("qmfCreateTime", create, attrs)
-
- if delete != 0:
- self.process_timestamp("qmfDeleteTime", delete, attrs)
-
- log.debug("%s(%s,%s) marked deleted",
- cls.__name__, self.agent.id, self.object_id)
-
- attrs["qmfAgentId"] = self.agent.id
- attrs["qmfClassKey"] = str(self.object.getClassKey())
- attrs["qmfObjectId"] = str(self.object_id)
- attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
-
- cursor = conn.cursor()
-
- # Cases:
- #
- # 1. Object is utterly new to mint
- # 2. Object is in mint's db, but id is not yet cached
- # 3. Object is in mint's db, and id is cached
-
- id = self.agent.database_ids.get(self.object_id)
-
- if id is None:
- # Case 1 or 2
-
- op = SqlGetId(cls)
- op.execute(cursor, attrs)
-
- rec = cursor.fetchone()
-
- if rec:
- id = rec[0]
-
- if id is None:
- # Case 1
-
- op = SqlInsert(cls, attrs)
- op.execute(cursor, attrs)
-
- id = cursor.fetchone()[0]
-
- log.debug("%s(%i) created", cls.__name__, id)
- else:
- # Case 2
-
- attrs["id"] = id
-
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
-
- assert cursor.rowcount == 1
-
- self.agent.database_ids.set(self.object_id, id)
- else:
- # Case 3
-
- attrs["id"] = id
-
- op = SqlUpdate(cls, attrs)
- op.execute(cursor, attrs)
-
- #assert cursor.rowcount == 1
-
- try:
- updates = self.agent.deferred_updates.pop(self.object_id)
-
- if updates:
- log.info("Reenqueueing %i deferred updates", len(updates))
-
- for update in updates:
- self.thread.enqueue(update)
- except KeyError:
- pass
-
- self.agent.database_ids.commit()
-
- stats.prop_updated += 1
-
-class StatisticUpdate(ObjectUpdate):
- def do_process(self, conn, stats):
- try:
- cls = self.getClass()
- except ReferenceException, e:
- log.info("Referenced class %r not found", e.sought)
- return
-
- statsCls = getattr(mint, "%sStats" % cls.__name__)
-
- id = self.agent.database_ids.get(self.object_id)
-
- if id is None:
- stats.dropped += 1
- return
-
- timestamps = self.object.getTimestamps()
-
- tnow = datetime.now()
- t = datetime.fromtimestamp(timestamps[0] / 1000000000)
-
- if t < tnow - timedelta(seconds=30):
- seconds = (tnow - t).seconds
- log.debug("Update is %i seconds old; skipping it", seconds)
-
- stats.dropped += 1
-
- return
-
- try:
- attrs = self.process_attributes \
- (self.object.getStatistics(), statsCls)
- except ReferenceException:
- stats.dropped += 1
-
- return
-
- # XXX do we still want this
- attrs["qmfUpdateTime"] = t > tnow and tnow or t
- attrs["%s_id" % cls.sqlmeta.table] = id
-
- cursor = conn.cursor()
-
- op = SqlInsert(statsCls, attrs)
- op.execute(cursor, attrs)
-
- log.debug("%s(%s) created", statsCls.__name__, id)
-
- stats.stat_updated += 1
-
-class AgentDisconnectUpdate(Update):
- def __init__(self, agent):
- super(AgentDisconnectUpdate, self).__init__()
-
- self.agent = agent
-
- def do_process(self, conn, stats):
- cursor = conn.cursor()
-
- args = dict()
-
- if self.agent:
- args["qmf_agent_id"] = self.agent.id
-
- op = SqlAgentDisconnect(self.agent)
- op.execute(cursor, args)
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/util.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -69,12 +69,16 @@
return cls(brokerId, brokerBank, agentBank)
def fromAgent(cls, agent):
+ return agent.getAgentBank()
+
broker = agent.getBroker()
brokerId = broker.getBrokerId()
brokerBank = broker.getBrokerBank()
agentBank = agent.getAgentBank()
+ print "XXX", brokerId, brokerBank, agentBank
+
return cls(brokerId, brokerBank, agentBank)
def fromString(cls, string):
@@ -90,16 +94,20 @@
fromString = classmethod(fromString)
def __str__(self):
- return "%s.%i.%i" % (self.brokerId, self.brokerBank, self.agentBank)
+ return self.agentBank
class QmfObjectId(object):
- def __init__(self, first, second):
- self.first = first
- self.second = second
+ def __init__(self, id):
+ self.id = id
def fromObject(cls, object):
oid = object.getObjectId()
+ print "XXX", oid
+
+ for k, v in oid.__dict__.items():
+ print " ", k, v
+
return cls(oid.first, oid.second)
def fromString(cls, string):
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,5 +1,4 @@
from newupdate import *
-from update import *
from util import *
log = logging.getLogger("mint.vacuum")
@@ -10,30 +9,13 @@
if self.stop_requested:
break
- up = VacuumUpdate()
+ up = VacuumUpdate(self.app.model)
self.app.update_thread.enqueue(up)
- up = NewVacuumUpdate(self.app.model)
- self.app.new_update_thread.enqueue(up)
-
sleep(60 * 60 * 10)
class VacuumUpdate(Update):
def do_process(self, conn, stats):
- log.info("Vacuuming")
-
- conn.commit()
-
- level = conn.isolation_level
- conn.set_isolation_level(0)
-
- cursor = conn.cursor()
- cursor.execute("vacuum")
-
- conn.set_isolation_level(level)
-
-class NewVacuumUpdate(NewUpdate):
- def do_process(self, conn, stats):
log.info("Vacumming tables")
level = conn.isolation_level
@@ -57,4 +39,3 @@
log.debug("Database: %s", notice.replace("\n", " "))
finally:
cursor.close()
-
Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py 2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/rosemary/python/rosemary/model.py 2010-04-23 18:36:37 UTC (rev 3925)
@@ -410,6 +410,9 @@
self.sql_delete.execute(cursor, (), obj.__dict__)
+ def delete_selection(self, cursor, **kwargs):
+ pass # XXX
+
def __repr__(self):
args = (self.__class__.__name__, self._package._name, self._name)
return "%s(%s,%s)" % args
More information about the rhmessaging-commits
mailing list