[rhmessaging-commits] rhmessaging commits: r3925 - in mgmt/newdata: mint and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Apr 23 14:36:38 EDT 2010


Author: justi9
Date: 2010-04-23 14:36:37 -0400 (Fri, 23 Apr 2010)
New Revision: 3925

Removed:
   mgmt/newdata/mint/python/mint/update.py
Modified:
   mgmt/newdata/cumin/python/cumin/objectframe.py
   mgmt/newdata/mint/Makefile
   mgmt/newdata/mint/python/mint/database.py
   mgmt/newdata/mint/python/mint/expire.py
   mgmt/newdata/mint/python/mint/main.py
   mgmt/newdata/mint/python/mint/model.py
   mgmt/newdata/mint/python/mint/newupdate.py
   mgmt/newdata/mint/python/mint/tools.py
   mgmt/newdata/mint/python/mint/util.py
   mgmt/newdata/mint/python/mint/vacuum.py
   mgmt/newdata/rosemary/python/rosemary/model.py
Log:
 * Remove old update framework

 * Make --log-level also affect --debug mode in mint tools

 * Adapt to v1.1 qmf console changes; a problem with oid reference
   values remains

 * Partial adaptation of mint-admin functions away from sqlobject

 * Fix mint-bench reporting

 * Drop ObjectId and AgentId adapter classes

 * Remove unused schema-related makefile rules from mint

 * Tweak context-path separator styling


Modified: mgmt/newdata/cumin/python/cumin/objectframe.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/objectframe.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/cumin/python/cumin/objectframe.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -190,7 +190,7 @@
 
             links.append(self.link.render(session, frame))
 
-        return " > ".join(reversed(links))
+        return " › ".join(reversed(links))
 
 class ObjectViewContextLink(Link):
     def __init__(self, app, name):

Modified: mgmt/newdata/mint/Makefile
===================================================================
--- mgmt/newdata/mint/Makefile	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/Makefile	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,4 +1,4 @@
-.PHONY: build install clean schema schema-clean
+.PHONY: build install clean
 
 include ../etc/Makefile.common
 
@@ -26,14 +26,4 @@
 	install -d ${etc}
 	install -pm 0644 etc/mint-vacuumdb.cron ${etc}
 
-schema: schema-clean
-	$(MAKE) schema -C xml
-	$(MAKE) schema -C python/mint
-	$(MAKE) schema -C sql
-
-schema-clean:
-	$(MAKE) clean -C xml
-	$(MAKE) clean -C python/mint
-	$(MAKE) clean -C sql
-
 clean: clean-python-files

Modified: mgmt/newdata/mint/python/mint/database.py
===================================================================
--- mgmt/newdata/mint/python/mint/database.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/database.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,10 +1,8 @@
-import logging
-import os.path
-
 from psycopg2 import ProgrammingError
 from sqlobject import connectionForURI, sqlhub
 
 from model import MintInfo, Role
+from util import *
 
 log = logging.getLogger("mint.database")
 

Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/expire.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,7 +1,6 @@
 from newupdate import *
 from schema import *
 from sql import *
-from update import *
 from util import *
 
 import mint
@@ -43,12 +42,9 @@
             if self.stop_requested:
                 break
 
-            up = ExpireUpdate()
+            up = ExpireUpdate(self.app.model)
             self.app.update_thread.enqueue(up)
 
-            up = NewExpireUpdate(self.app.model)
-            self.app.new_update_thread.enqueue(up)
-
             sleep(frequency)
 
     def __convertTimeUnits(self, t):
@@ -68,28 +64,6 @@
 
 class ExpireUpdate(Update):
     def do_process(self, conn, stats):
-        attrs = self.thread.app.expire_thread.attrs
-
-        cursor = conn.cursor()
-        total = 0
-
-        for op in self.thread.app.expire_thread.ops:
-            log.debug("Running expire op %s", op)
-
-            count = op.execute(cursor, attrs)
-
-            conn.commit()
-
-            log.debug("%i records expired", count)
-
-            total += count
-
-        log.debug("%i total records expired", total)
-
-        stats.expired += 1
-
-class NewExpireUpdate(NewUpdate):
-    def do_process(self, conn, stats):
         seconds = self.model.app.expire_threshold
 
         log.info("Expiring samples older than %i seconds", seconds)

Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/main.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,8 +1,7 @@
 from database import MintDatabase
 from expire import ExpireThread
 from model import MintModel
-from newupdate import NewUpdateThread
-from update import UpdateThread
+from newupdate import UpdateThread
 from vacuum import VacuumThread
 
 from util import *
@@ -19,7 +18,6 @@
 
         self.update_enabled = True
         self.update_thread = UpdateThread(self)
-        self.new_update_thread = NewUpdateThread(self)
 
         self.expire_enabled = True
         self.expire_frequency = self.config.expire_frequency
@@ -44,7 +42,6 @@
         log.info("Expiration is %s", state(self.expire_enabled))
 
         self.update_thread.init()
-        self.new_update_thread.init()
         self.expire_thread.init()
         self.vacuum_thread.init()
 
@@ -52,8 +49,7 @@
         self.model.start()
 
         if self.update_enabled:
-            # XXX self.update_thread.start()
-            self.new_update_thread.start()
+            self.update_thread.start()
 
         if self.expire_enabled:
             self.expire_thread.start()
@@ -66,7 +62,6 @@
 
         if self.update_enabled:
             self.update_thread.stop()
-            self.new_update_thread.stop()
 
         if self.expire_enabled:
             self.expire_thread.stop()

Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/model.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -5,7 +5,6 @@
 from newupdate import *
 from schema import *
 from schemalocal import *
-from update import *
 from util import *
 
 import mint.schema
@@ -23,7 +22,7 @@
         mint.schema.model = self
 
         self.rosemary = RosemaryModel()
-        self.rosemary.sql_logging_enabled = True
+        self.rosemary.sql_logging_enabled = False
 
         self.qmf_session = None
         self.qmf_brokers = list()
@@ -54,8 +53,9 @@
         # have left behind in the DB; it's basically an unconstrained
         # agent disconnect update, for any agent
 
-        up = AgentDisconnectUpdate(None)
-        self.app.update_thread.enqueue(up)
+        # XXX
+        #up = AgentDisconnectUpdate(None)
+        #self.app.update_thread.enqueue(up)
 
         uris = [x.strip() for x in self.app.config.qmf.split(",")]
 
@@ -63,25 +63,34 @@
             self.add_broker(uri)
 
     def do_stop(self):
-        for qbroker in self.qmf_brokers:
-            self.qmf_session.delBroker(qbroker)
+        for qmf_broker in self.qmf_brokers:
+            self.qmf_session.delBroker(qmf_broker)
 
     def add_broker(self, url):
         log.info("Adding qmf broker at %s", url)
 
         self.lock.acquire()
         try:
-            qbroker = self.qmf_session.addBroker(url)
-            self.qmf_brokers.append(qbroker)
+            qmf_broker = self.qmf_session.addBroker(url)
+            self.qmf_brokers.append(qmf_broker)
         finally:
             self.lock.release()
 
+    def get_agent(self, qmf_agent):
+        id = qmf_agent.getAgentBank()
+
+        self.lock.acquire()
+        try:
+            return self.agents[id]
+        finally:
+            self.lock.release()
+
 class MintAgent(object):
     def __init__(self, model, qmf_agent):
         self.model = model
         self.qmf_agent = qmf_agent
 
-        self.id = str(QmfAgentId.fromAgent(self.qmf_agent))
+        self.id = qmf_agent.getAgentBank()
 
         self.last_heartbeat = None
 
@@ -105,8 +114,12 @@
         assert isinstance(obj, RosemaryObject)
         
         class_key = ClassKey(obj._qmf_class_key)
-        oid = QmfObjectId.fromString(obj._qmf_object_id).toObjectId()
 
+        oid_args = {"_agent_name": obj._qmf_agent_id,
+                    "_object_name": obj._qmf_object_id}
+
+        oid = ObjectId(oid_args)
+
         self.model.lock.acquire()
         try:
             broker = self.qmf_agent.getBroker()
@@ -137,63 +150,41 @@
     def __init__(self, model):
         self.model = model
 
-        self.deferred_object_prop_calls = defaultdict(list)
-        self.deferred_object_stat_calls = defaultdict(list)
+    def brokerConnected(self, qmf_broker):
+        log.info("Broker at %s:%i is connected",
+                 qmf_broker.host, qmf_broker.port)
 
-    def brokerConnected(self, qbroker):
-        log.info("Broker at %s:%i is connected", qbroker.host, qbroker.port)
+    def brokerInfo(self, qmf_broker):
+        log.info("Broker info from %s", qmf_broker)
 
-    def brokerInfo(self, qbroker):
-        # Now we have a brokerId to use to generate fully qualified agent
-        # IDs
+    def brokerDisconnected(self, qmf_broker):
+        log.info("Broker at %s:%i is disconnected",
+                 qmf_broker.host, qmf_broker.port)
 
-        for qagent in qbroker.getAgents():
-            MintAgent(self.model, qagent)
+    def newAgent(self, qmf_agent):
+        log.info("Creating %s", qmf_agent)
 
-    def brokerDisconnected(self, qbroker):
-        log.info("Broker at %s:%i is disconnected", qbroker.host, qbroker.port)
+        MintAgent(self.model, qmf_agent)
 
-    def newAgent(self, qagent):
-        log.info("Creating %s", qagent)
+    def delAgent(self, qmf_agent):
+        log.info("Deleting %s", qmf_agent)
 
-        # Some agents come down without a brokerId, meaning we can't
-        # generate a fully qualified agent ID for them.  Those we
-        # handle in brokerInfo.
+        try:
+            agent = self.model.get_agent(qmf_agent)
+        except KeyError:
+            return
 
-        if qagent.getBroker().brokerId:
-            agent = MintAgent(self.model, qagent)
-
-            # XXX This business is to handle an agent-vs-agent-data ordering
-            # problem
-
-            objectPropCalls = self.deferred_object_prop_calls[agent.id]
-
-            for broker, object in objectPropCalls:
-                self.objectProps(broker, object)
-
-            objectStatCalls = self.deferred_object_stat_calls[agent.id]
-
-            for broker, object in objectStatCalls:
-                self.objectStats(broker, object)
-
-    def delAgent(self, qagent):
-        log.info("Deleting %s", qagent)
-
-        id = str(QmfAgentId.fromAgent(qagent))
-
-        agent = self.model.agents[id]
         agent.delete()
 
-        up = AgentDisconnectUpdate(agent)
-        self.model.app.update_thread.enqueue(up)
+        if self.model.app.update_thread.isAlive():
+            up = AgentDelete(self.model, agent)
+            self.model.app.update_thread.enqueue(up)
 
-    def heartbeat(self, qagent, timestamp):
+    def heartbeat(self, qmf_agent, timestamp):
         timestamp = timestamp / 1000000000
 
-        id = str(QmfAgentId.fromAgent(qagent))
-
         try:
-            agent = self.model.agents[id]
+            agent = self.model.get_agent(qmf_agent)
         except KeyError:
             return
 
@@ -208,58 +199,26 @@
         # XXX I want to store class keys using this, but I can't,
         # because I don't get any agent info; instead
 
-    def objectProps(self, broker, object):
-        self.model.lock.acquire()
-        try:
-            pass
-        finally:
-            self.model.lock.release()
+    def objectProps(self, broker, obj):
+        agent = self.model.get_agent(obj._agent)
 
-        self.model.lock.acquire()
-        try:
-            id = str(QmfAgentId.fromObject(object))
-
-            try:
-                agent = self.model.agents[id]
-            except KeyError:
-                self.deferred_object_prop_calls[id].append((broker, object))
-                return
-        finally:
-            self.model.lock.release()
-
         if self.model.app.update_thread.isAlive():
-            up = PropertyUpdate(agent, object)
-            self.model.app.update_thread.enqueue(up)
-
-        if self.model.app.new_update_thread.isAlive():
-            if object.getTimestamps()[2]:
-                up = NewObjectDelete(self.model, agent, object)
+            if obj.getTimestamps()[2]:
+                up = ObjectDelete(self.model, agent, obj)
             else:
-                up = NewObjectUpdate(self.model, agent, object)
+                up = ObjectUpdate(self.model, agent, obj)
 
-            self.model.app.new_update_thread.enqueue(up)
+            self.model.app.update_thread.enqueue(up)
 
-    def objectStats(self, broker, object):
-        self.model.lock.acquire()
-        try:
-            id = str(QmfAgentId.fromObject(object))
+    def objectStats(self, broker, obj):
+        print "objectStats!", broker, obj
 
-            try:
-                agent = self.model.agents[id]
-            except KeyError:
-                self.deferred_object_stat_calls[id].append((broker, object))
-                return
-        finally:
-            self.model.lock.release()
+        agent = self.get_agent(obj._agent)
 
         if self.model.app.update_thread.isAlive():
-            up = StatisticUpdate(agent, object)
+            up = ObjectAddSample(self.model, agent, obj)
             self.model.app.update_thread.enqueue(up)
 
-        if self.model.app.new_update_thread.isAlive():
-            up = NewSampleUpdate(self.model, agent, object)
-            self.model.app.new_update_thread.enqueue(up)
-
     def event(self, broker, event):
         """ Invoked when an event is raised. """
         pass

Modified: mgmt/newdata/mint/python/mint/newupdate.py
===================================================================
--- mgmt/newdata/mint/python/mint/newupdate.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/newupdate.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -6,9 +6,9 @@
 
 log = logging.getLogger("mint.newupdate")
 
-class NewUpdateThread(MintDaemonThread):
+class UpdateThread(MintDaemonThread):
     def __init__(self, app):
-        super(NewUpdateThread, self).__init__(app)
+        super(UpdateThread, self).__init__(app)
 
         self.conn = None
         self.stats = None
@@ -19,7 +19,7 @@
 
     def init(self):
         self.conn = self.app.database.get_connection()
-        self.stats = NewUpdateStats()
+        self.stats = UpdateStats()
 
     def enqueue(self, update):
         update.thread = self
@@ -50,7 +50,7 @@
 
             update.process(self.conn, self.stats)
 
-class NewUpdateStats(object):
+class UpdateStats(object):
     def __init__(self):
         self.enqueued = 0
         self.dequeued = 0
@@ -63,7 +63,7 @@
         self.samples_expired = 0
         self.samples_dropped = 0
 
-class NewUpdate(object):
+class Update(object):
     def __init__(self, model):
         self.model = model
 
@@ -83,7 +83,7 @@
 
             conn.rollback()
 
-            if self.model.app.new_update_thread.halt_on_error:
+            if self.model.app.update_thread.halt_on_error:
                 raise
 
     def do_process(self, conn, stats):
@@ -92,58 +92,20 @@
     def __repr__(self):
         return self.__class__.__name__
 
-class NewObjectUpdate(NewUpdate):
-    def __init__(self, model, agent, object):
-        super(NewObjectUpdate, self).__init__(model)
+class ObjectUpdate(Update):
+    def __init__(self, model, agent, obj):
+        super(ObjectUpdate, self).__init__(model)
 
         self.agent = agent
-        self.object = object
-        self.object_id = str(QmfObjectId.fromObject(object))
-        self.session_id = None
+        self.object = obj
 
-        sequence = object.getObjectId().getSequence()
-
-        if sequence != 0:
-            self.session_id = str(sequence)
-
-        update_time, create_time, delete_time = self.object.getTimestamps()
-
-        self.update_time = datetime.fromtimestamp(update_time / 1000000000)
-        self.create_time = datetime.fromtimestamp(create_time / 1000000000)
-        self.delete_time = datetime.fromtimestamp(delete_time / 1000000000)
-
     def do_process(self, conn, stats):
         cls = self.get_class()
-        obj = self.get_object(cls, self.object_id)
+        obj = self.get_object(cls, self.object.getName())
 
         columns = list()
 
-        update_time, create_time, delete_time = self.object.getTimestamps()
-
-        if obj._sync_time:
-            # This object is already in the database
-
-            obj._qmf_update_time = self.update_time
-            columns.append(cls.sql_table._qmf_update_time)
-
-            # XXX session_id may have changed too?
-        else:
-            obj._qmf_agent_id = self.agent.id
-            obj._qmf_object_id = self.object_id
-            obj._qmf_session_id = self.session_id
-            obj._qmf_class_key = str(self.object.getClassKey())
-            obj._qmf_update_time = self.update_time
-            obj._qmf_create_time = self.create_time
-
-            columns.append(cls.sql_table._id)
-            columns.append(cls.sql_table._qmf_agent_id)
-            columns.append(cls.sql_table._qmf_object_id)
-            columns.append(cls.sql_table._qmf_session_id)
-            columns.append(cls.sql_table._qmf_class_key)
-            columns.append(cls.sql_table._qmf_update_time)
-            columns.append(cls.sql_table._qmf_create_time)
-
-        self.process_references(obj, columns)
+        self.process_qmf_attributes(obj, columns)
         self.process_properties(obj, columns)
 
         cursor = conn.cursor()
@@ -166,7 +128,7 @@
             raise PackageUnknown(name)
 
         name = class_key.getClassName()
-        name = name[0].upper() + name[1:]
+        name = name[0].upper() + name[1:] # /me shakes fist
 
         try:
             cls = pkg._classes_by_name[name]
@@ -198,63 +160,100 @@
 
             return obj
 
-    def process_references(self, obj, columns):
-        for prop, value in self.object.getProperties():
-            if prop.type != 10:
-                continue
+    def process_qmf_attributes(self, obj, columns):
+        table = obj._class.sql_table
 
-            try:
-                ref = obj._class._references_by_name[prop.name]
-            except KeyError:
-                log.debug("Reference %s is unknown", prop.name)
+        update_time, create_time, delete_time = self.object.getTimestamps()
 
-                continue
+        update_time = datetime.fromtimestamp(update_time / 1000000000)
+        create_time = datetime.fromtimestamp(create_time / 1000000000)
 
-            if not ref.sql_column:
-                log.warn("Reference %s has no column; skipping it", ref.name)
+        if delete_time:
+            delete_time = datetime.fromtimestamp(delete_time / 1000000000)
 
-                continue
+        if obj._sync_time:
+            # This object is already in the database
 
-            col = ref.sql_column
+            obj._qmf_update_time = update_time
+            columns.append(table._qmf_update_time)
 
-            if value:
-                that_id = str(QmfObjectId(value.first, value.second))
-                that = self.get_object(ref.that_cls, that_id)
+            # XXX session_id may have changed too?
+        else:
+            obj._qmf_agent_id = self.agent.id
+            obj._qmf_object_id = self.object.getName()
+            obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+            obj._qmf_class_key = str(self.object.getClassKey())
+            obj._qmf_update_time = update_time
+            obj._qmf_create_time = create_time
 
-                if not that._sync_time:
-                    continue
+            columns.append(table._id)
+            columns.append(table._qmf_agent_id)
+            columns.append(table._qmf_object_id)
+            columns.append(table._qmf_session_id)
+            columns.append(table._qmf_class_key)
+            columns.append(table._qmf_update_time)
+            columns.append(table._qmf_create_time)
 
-                value = that._id
+    def process_properties(self, obj, columns):
+        cls = obj._class
 
-            columns.append(col)
-
-            setattr(obj, col.name, value)
-
-    def process_properties(self, obj, columns):
         for prop, value in self.object.getProperties():
-            if prop.type == 10:
-                continue
-
             try:
-                col = obj._class._properties_by_name[prop.name].sql_column
-            except KeyError:
-                log.debug("Property %s is unknown", prop)
-
+                if prop.type == 10:
+                    col, nvalue = self.process_reference(cls, prop, value)
+                else:
+                    col, nvalue = self.process_value(cls, prop, value)
+            except MappingException, e:
+                log.debug(e)
                 continue
 
-            if value is not None:
-                value = self.transform_value(prop, value)
-
             # XXX This optimization will be obsolete when QMF does it
             # instead
 
-            if value == getattr(obj, col.name):
+            if nvalue == getattr(obj, col.name):
                 continue
 
+            setattr(obj, col.name, nvalue)
             columns.append(col)
 
-            setattr(obj, col.name, value)
+    def process_reference(self, cls, prop, value):
+        try:
+            ref = cls._references_by_name[prop.name]
+        except KeyError:
+            raise MappingException("Reference %s is unknown" % prop.name)
 
+        if not ref.sql_column:
+            raise MappingException("Reference %s has no column" % ref.name)
+
+        col = ref.sql_column
+
+        if value:
+            try:
+                that_id = str(value.objectName)
+            except:
+                raise MappingException("XXX ref isn't an oid")
+
+            that = self.get_object(ref.that_cls, that_id)
+
+            if not that._sync_time:
+                msg = "Referenced object %s hasn't appeared yet"
+                raise MappingException(msg % that)
+
+            value = that._id
+
+        return col, value
+
+    def process_value(self, cls, prop, value):
+        try:
+            col = cls._properties_by_name[prop.name].sql_column
+        except KeyError:
+            raise MappingException("Property %s is unknown" % prop)
+
+        if value is not None:
+            value = self.transform_value(prop, value)
+
+        return col, value
+
     def transform_value(self, attr, value):
         if attr.type == 8: # absTime
             if value == 0:
@@ -274,13 +273,14 @@
     def __repr__(self):
         name = self.__class__.__name__
         cls = self.object.getClassKey().getClassName()
+        id = self.object.getName()
 
-        return "%s(%s,%s,%s)" % (name, self.agent.id, cls, self.object_id)
+        return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
 
-class NewObjectDelete(NewObjectUpdate):
+class ObjectDelete(ObjectUpdate):
     def do_process(self, conn, stats):
         cls = self.get_class()
-        obj = self.get_object(cls, self.object_id)
+        obj = self.get_object(cls, self.object.getName())
 
         cursor = conn.cursor()
 
@@ -290,16 +290,16 @@
             cursor.close()
 
         try:
-            del self.agent.objects_by_id[self.object_id]
+            del self.agent.objects_by_id[self.object.getName()]
         except KeyError:
             pass
 
         stats.deleted += 1
 
-class NewSampleUpdate(NewObjectUpdate):
+class ObjectAddSample(ObjectUpdate):
     def do_process(self, conn, stats):
         cls = self.get_class()
-        obj = self.get_object(cls, self.object_id)
+        obj = self.get_object(cls, self.object.getName())
 
         if not cls._statistics:
             stats.samples_dropped += 1; return
@@ -311,13 +311,17 @@
             if obj._qmf_update_time > datetime.now() - timedelta(seconds=60):
                 stats.samples_dropped += 1; return
 
+        update_time, create_time, delete_time = self.object.getTimestamps()
+
+        update_time = datetime.fromtimestamp(update_time / 1000000000)
+
         update_columns = list()
         update_columns.append(cls.sql_table._qmf_update_time)
 
         insert_columns = list()
         insert_columns.append(cls.sql_samples_table._qmf_update_time)
 
-        obj._qmf_update_time = self.update_time
+        obj._qmf_update_time = update_time
 
         self.process_samples(obj, update_columns, insert_columns)
 
@@ -345,6 +349,8 @@
             if value is not None:
                 value = self.transform_value(stat, value)
 
+            # Don't write unchanged values
+            #
             # XXX This optimization will be obsolete when QMF does it
             # instead
 
@@ -355,6 +361,28 @@
 
             setattr(obj, col.name, value)
 
+class AgentDelete(Update):
+    def __init__(self, model, agent):
+        super(AgentDelete, self).__init__(model)
+
+        self.agent = agent
+
+    def do_process(self, conn, stats):
+        print "Ahoy!"
+        
+        cursor = conn.cursor()
+
+        id = self.agent.id
+
+        try:
+            for pkg in self.model.rosemary._packages:
+                for cls in pkg._classes:
+                    for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+                        obj.delete()
+                        print "Bam!", obj
+        finally:
+            cursor.close()
+
 class UpdateException(Exception):
     def __init__(self, name):
         self.name = name
@@ -370,3 +398,6 @@
 
 class ObjectUnknown(UpdateException):
     pass
+
+class MappingException(Exception):
+    pass

Modified: mgmt/newdata/mint/python/mint/tools.py
===================================================================
--- mgmt/newdata/mint/python/mint/tools.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/tools.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -72,9 +72,12 @@
 
         if self.config.debug:
             self.config.prt()
-            enable_logging("rosemary", "debug", sys.stderr)
-            enable_logging("mint", "debug", sys.stderr)
 
+            level = getattr(self.config, "log_level", "debug")
+
+            enable_logging("rosemary", level, sys.stderr)
+            enable_logging("mint", level, sys.stderr)
+
         self.do_run(opts, args)
 
     def do_run(self, opts, args):
@@ -87,7 +90,7 @@
 
 class DatabaseSubcommand(Command):
     def run(self, opts, args):
-        conn = self.parent.database.get_connection()
+        conn = self.parent.app.database.get_connection()
         cursor = conn.cursor()
 
         try:
@@ -153,6 +156,8 @@
         command.description = "Change password of USER"
         command.arguments = ("USER",)
 
+        self.app = None
+
     def run(self):
         try:
             opts, remaining = self.parse_options(sys.argv[1:])
@@ -171,16 +176,13 @@
             self.config.prt()
             enable_logging("mint", "debug", sys.stderr)
 
-        app = Mint(self.config)
-        app.update_enabled = False
-        app.expire_enabled = False
+        self.app = Mint(self.config)
+        self.app.update_enabled = False
+        self.app.expire_enabled = False
 
-        #self.cumin = app.model.rosemary.com_redhat_cumin
+        self.app.check()
+        self.app.init()
 
-        self.database = MintDatabase(app)
-        self.database.check()
-        self.database.init()
-
         try:
             scommand = remaining[0]
         except IndexError:
@@ -214,13 +216,13 @@
 
     class LoadSchema(Command):
         def run(self, opts, args):
-            self.parent.database.load_schema()
+            self.parent.app.database.load_schema()
             print "The schema is loaded"
 
     class DropSchema(Command):
         def run(self, opts, args):
             if "force" in opts:
-                self.parent.database.drop_schema()
+                self.parent.app.database.drop_schema()
                 print "The schema is dropped"
             else:
                 raise CommandException \
@@ -229,8 +231,8 @@
     class ReloadSchema(Command):
         def run(self, opts, args):
             if "force" in opts:
-                self.parent.database.drop_schema()
-                self.parent.database.load_schema()
+                self.parent.app.database.drop_schema()
+                self.parent.app.database.load_schema()
                 print "The schema is reloaded"
             else:
                 raise CommandException \
@@ -238,7 +240,7 @@
 
     class CheckSchema(Command):
         def run(self, opts, args):
-            self.parent.database.check_schema()
+            self.parent.app.database.check_schema()
 
     class AddUser(DatabaseSubcommand):
         def do_run(self, cursor, opts, args):
@@ -247,13 +249,6 @@
             except IndexError:
                 raise CommandException(self, "NAME is required")
 
-            #objs = self.parent.cumin.User.get_selection(cursor, name=name)
-            #if objs:
-
-            if Subject.selectBy(name=name).count():
-                print "Error: a user called '%s' already exists" % name
-                sys.exit(1)
-
             try:
                 password = args[2]
             except IndexError:
@@ -261,62 +256,61 @@
 
             crypted = crypt_password(password)
 
-            try:
-                subject = Subject(name=name, password=crypted)
+            pkg = self.parent.app.model.rosemary.com_redhat_cumin
 
-                for role in Role.selectBy(name="user"):
-                    subject.addRole(role)
+            for role in pkg.Role.get_selection(cursor, name="user"):
+                break
 
-                assert role
+            assert role, self
 
-                subject.syncUpdate()
+            user = pkg.User.create_object(cursor)
+            user.name = name
+            user.password = crypted
 
-                # user = self.parent.cumin.User.create_object(cursor)
-                # user.name = name
-                # user.password = crypted
-                # user.save(cursor)
-
-                # roles = self.parent.cumin.Role.get_selection \
-                #     (cursor, name="user")
-
-                # for role in roles:
-                #     mapping = self.parent.cumin.UserRoleMapping.create_object \
-                #         (cursor)
-                #     mapping.user = user
-                #     mapping.role = role
-                #     mapping.save(cursor)
-
-                #     break
-
-                assert role, self
+            try:
+                user.save(cursor)
             except IntegrityError:
                 print "Error: a user called '%s' already exists" % name
                 sys.exit(1)
 
+            mapping = pkg.UserRoleMapping.create_object(cursor)
+            mapping._role_id = role._id
+            mapping._user_id = user._id
+            mapping.save(cursor)
+            
+            conn.commit()
+
+            assert role, self
+
             print "User '%s' is added" % name
 
-    class RemoveUser(Command):
-        def run(self, opts, args):
-            if "force" in opts:
-                if len(args) != 2:
-                    print "Error: no user name given"
-                    sys.exit(1)
+    class RemoveUser(DatabaseSubcommand):
+        def do_run(self, cursor, opts, args):
+            if "force" not in opts:
+                msg = "Command remove-user requires --force"
+                raise CommandException(self, msg)
 
+            try:
                 name = args[1]
-                subjects = Subject.selectBy(name=name)
+            except IndexError:
+                raise CommandException(self, "NAME is required")
 
-                if subjects.count():
-                    for subject in subjects:
-                        subject.destroySelf()
-                        break
-                else:
-                    raise CommandException(self, "User '%s' is unknown" % name)
+            name = args[1]
 
-                print "User '%s' is removed" % name
-            else:
-                raise CommandException \
-                    (self, "Command remove-user requires --force yes")
+            cls = self.app.model.rosemary.com_redhat_cumin.User
 
+            for user in cls.get_selection(cursor, name=name):
+                break
+
+            if not user:
+                raise CommandException(self, "User '%s' is unknown" % name)
+
+            user.delete(cursor)
+
+            conn.commit()
+
+            print "User '%s' is removed" % name
+
     class ListUsers(Command):
         def run(self, opts, args):
             subjects = Subject.select(orderBy='name')
@@ -457,8 +451,34 @@
             for arg in args[1:]:
                 app.model.add_broker(arg)
 
+            sleep(2)
+
+            cls = app.model.rosemary.org_apache_qpid_broker.Broker
+
+            conn = app.database.get_connection()
+            cursor = conn.cursor()
+
+            for obj in cls.get_selection(cursor):
+                try:
+                    agent = app.model.agents[obj._qmf_agent_id]
+                except KeyError:
+                    continue
+
+                break
+
+            print "TTT", obj.port, obj, agent
+
+            def completion(status_code, output_args):
+                print "YYY", status_code, output_args
+
+            agent.call_method(completion, obj, "echo", 1, "ggoo!")
+
             while True:
                 sleep(2)
+        except Exception, e:
+            print_exc()
+
+            print e
         finally:
             app.stop()
 
@@ -492,6 +512,17 @@
                     print "Warning: Failed connecting to broker at '%s'" % arg
 
             try:
+                enq = 0
+                deq = 0
+
+                upd = 0
+                dlt = 0
+                drp = 0
+
+                supd = 0
+                sexp = 0
+                sdrp = 0
+
                 enq_last = 0
                 deq_last = 0
 
@@ -513,7 +544,7 @@
 
                     sleep(1)
 
-                    stats = app.new_update_thread.stats
+                    stats = app.update_thread.stats
 
                     enq = stats.enqueued
                     deq = stats.dequeued
@@ -522,9 +553,9 @@
                     dlt = stats.deleted
                     drp = stats.dropped
 
-                    supd = stats.stats_updated
-                    sexp = stats.stats_expired
-                    sdrp = stats.stats_dropped
+                    supd = stats.samples_updated
+                    sexp = stats.samples_expired
+                    sdrp = stats.samples_dropped
 
                     print row % (enq - enq_last,
                                  deq - deq_last,
@@ -561,5 +592,6 @@
         finally:
             #from threading import enumerate
             #for item in enumerate():
+            #    print item
 
             app.stop()

Deleted: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/update.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,367 +0,0 @@
-import pickle
-import mint
-
-from collections import deque
-from qpid.datatypes import UUID
-
-from schema import *
-from sql import *
-from util import *
-
-log = logging.getLogger("mint.update")
-
-class UpdateThread(MintDaemonThread):
-    """
-    Only the update thread writes to the database
-    """
-
-    def __init__(self, app):
-        super(UpdateThread, self).__init__(app)
-
-        self.conn = None
-        self.stats = None
-
-        self.updates = UpdateQueue(slotCount=2)
-
-    def init(self):
-        self.conn = self.app.database.get_connection()
-        self.stats = UpdateStats()
-
-    def enqueue(self, update):
-        update.thread = self
-
-        self.updates.put(update)
-
-        self.stats.enqueued += 1
-
-        # This is an attempt to yield from the enqueueing thread (this
-        # method's caller) to the update thread
-
-        if self.updates.qsize() > 1000:
-            sleep(0.1)
-
-    def run(self):
-        while True:
-            if self.stop_requested:
-                break
-
-            try:
-                update = self.updates.get(True, 1)
-            except Empty:
-                continue
-
-            self.stats.dequeued += 1
-
-            update.process()
-
-class UpdateStats(object):
-    def __init__(self):
-        self.enqueued = 0
-        self.dequeued = 0
-        self.stat_updated = 0
-        self.prop_updated = 0
-        self.expired = 0
-        self.dropped = 0
-        self.deferred = 0
-
-class ReferenceException(Exception):
-    def __init__(self, sought):
-        self.sought = sought
-
-    def __str__(self):
-        return repr(self.sought)
-
-class Update(object):
-    def __init__(self):
-        self.thread = None
-        self.priority = 0
-
-    def process(self):
-        log.debug("Processing %s", self)
-
-        assert self.thread
-
-        conn = self.thread.conn
-        stats = self.thread.stats
-
-        try:
-            self.do_process(conn, stats)
-
-            for notice in conn.notices:
-                log.debug("Database: %s", notice)
-
-            conn.commit()
-        except:
-            log.exception("Update failed")
-
-            conn.rollback()
-
-    def do_process(self, conn, stats):
-        raise Exception("Not implemented")
-
-    def __repr__(self):
-        return "%s(%i)" % (self.__class__.__name__, self.priority)
-
-class ObjectUpdate(Update):
-    def __init__(self, agent, object):
-        super(ObjectUpdate, self).__init__()
-
-        self.agent = agent
-        self.object = object
-
-        self.object_id = str(QmfObjectId.fromObject(object))
-
-    def getClass(self):
-        # XXX this is unfortunate
-        name = self.object.getClassKey().getClassName()
-        name = mint.schema.schemaReservedWordsMap.get(name, name)
-        name = name[0].upper() + name[1:]
-
-        try:
-            return schemaNameToClassMap[name]
-        except KeyError:
-            raise ReferenceException(name)
-
-    def process_attributes(self, attrs, cls):
-        results = dict()
-
-        for key, value in attrs:
-            name = key.__repr__()
-            name = mint.schema.schemaReservedWordsMap.get(name, name)
-
-            if key.type == 10:
-                # XXX don't want oid around much
-                self.process_reference(name, value, results)
-                continue
-
-            if not hasattr(cls, name):
-                # Discard attrs that we don't have in our schema
-                log.debug("%s has no field '%s'" % (cls, name))
-                continue
-
-            if key.type == 8:
-                self.process_timestamp(name, value, results)
-                continue
-
-            if key.type == 14:
-                # Convert UUIDs into their string representation, to be
-                # handled by sqlobject
-                results[name] = str(value)
-                continue
-
-            if key.type == 15:
-                #if value:
-                results[name] = pickle.dumps(value)
-                continue
-
-            results[name] = value
-
-        return results
-
-    # XXX this needs to be a much more straightforward procedure
-    def process_reference(self, name, oid, results):
-        if name.endswith("Ref"):
-            name = name[:-3]
-
-        className = name[0].upper() + name[1:]
-
-        try:
-            otherClass = getattr(mint, className)
-        except AttributeError:
-            return
-
-        foreignKey = name + "_id"
-
-        object_id = str(QmfObjectId(oid.first, oid.second))
-        id = self.agent.database_ids.get(object_id)
-
-        if id is None:
-            # XXX don't want oid around much
-            raise ReferenceException(oid)
-
-        results[foreignKey] = id
-
-    def process_timestamp(self, name, tstamp, results):
-        if tstamp:
-            t = datetime.fromtimestamp(tstamp / 1000000000)
-            results[name] = t
-
-    def __repr__(self):
-        cls = self.object.getClassKey().getClassName()
-
-        return "%s(%s,%s,%s,%i)" % (self.__class__.__name__,
-                                    self.agent.id,
-                                    cls,
-                                    self.object_id,
-                                    self.priority)
-
-class PropertyUpdate(ObjectUpdate):
-    def do_process(self, conn, stats):
-        try:
-            cls = self.getClass()
-        except ReferenceException, e:
-            log.info("Referenced class %r not found", e.sought)
-
-            stats.dropped += 1
-
-            return
-
-        try:
-            attrs = self.process_attributes(self.object.getProperties(), cls)
-        except ReferenceException, e:
-            log.info("Referenced object %r not found", e.sought)
-
-            self.agent.deferred_updates[self.object_id].append(self)
-
-            stats.deferred += 1
-
-            return
-
-        update, create, delete = self.object.getTimestamps()
-
-        self.process_timestamp("qmfUpdateTime", update, attrs)
-        self.process_timestamp("qmfCreateTime", create, attrs)
-
-        if delete != 0:
-            self.process_timestamp("qmfDeleteTime", delete, attrs)
-
-            log.debug("%s(%s,%s) marked deleted",
-                      cls.__name__, self.agent.id, self.object_id)
-
-        attrs["qmfAgentId"] = self.agent.id
-        attrs["qmfClassKey"] = str(self.object.getClassKey())
-        attrs["qmfObjectId"] = str(self.object_id)
-        attrs["qmfPersistent"] = self.object.getObjectId().isDurable()
-
-        cursor = conn.cursor()
-
-        # Cases:
-        #
-        # 1. Object is utterly new to mint
-        # 2. Object is in mint's db, but id is not yet cached
-        # 3. Object is in mint's db, and id is cached
-
-        id = self.agent.database_ids.get(self.object_id)
-
-        if id is None:
-            # Case 1 or 2
-
-            op = SqlGetId(cls)
-            op.execute(cursor, attrs)
-
-            rec = cursor.fetchone()
-
-            if rec:
-                id = rec[0]
-
-            if id is None:
-                # Case 1
-
-                op = SqlInsert(cls, attrs)
-                op.execute(cursor, attrs)
-
-                id = cursor.fetchone()[0]
-
-                log.debug("%s(%i) created", cls.__name__, id)
-            else:
-                # Case 2
-
-                attrs["id"] = id
-
-                op = SqlUpdate(cls, attrs)
-                op.execute(cursor, attrs)
-
-                assert cursor.rowcount == 1
-
-            self.agent.database_ids.set(self.object_id, id)
-        else:
-            # Case 3
-
-            attrs["id"] = id
-
-            op = SqlUpdate(cls, attrs)
-            op.execute(cursor, attrs)
-
-            #assert cursor.rowcount == 1
-
-        try:
-            updates = self.agent.deferred_updates.pop(self.object_id)
-
-            if updates:
-                log.info("Reenqueueing %i deferred updates", len(updates))
-
-                for update in updates:
-                    self.thread.enqueue(update)
-        except KeyError:
-            pass
-
-        self.agent.database_ids.commit()
-
-        stats.prop_updated += 1
-
-class StatisticUpdate(ObjectUpdate):
-    def do_process(self, conn, stats):
-        try:
-            cls = self.getClass()
-        except ReferenceException, e:
-            log.info("Referenced class %r not found", e.sought)
-            return
-
-        statsCls = getattr(mint, "%sStats" % cls.__name__)
-
-        id = self.agent.database_ids.get(self.object_id)
-
-        if id is None:
-            stats.dropped += 1
-            return
-
-        timestamps = self.object.getTimestamps()
-
-        tnow = datetime.now()
-        t = datetime.fromtimestamp(timestamps[0] / 1000000000)
-
-        if t < tnow - timedelta(seconds=30):
-            seconds = (tnow - t).seconds
-            log.debug("Update is %i seconds old; skipping it", seconds)
-
-            stats.dropped += 1
-
-            return
-
-        try:
-            attrs = self.process_attributes \
-                (self.object.getStatistics(), statsCls)
-        except ReferenceException:
-            stats.dropped += 1
-
-            return
-
-        # XXX do we still want this
-        attrs["qmfUpdateTime"] = t > tnow and tnow or t
-        attrs["%s_id" % cls.sqlmeta.table] = id
-
-        cursor = conn.cursor()
-
-        op = SqlInsert(statsCls, attrs)
-        op.execute(cursor, attrs)
-
-        log.debug("%s(%s) created", statsCls.__name__, id)
-
-        stats.stat_updated += 1
-
-class AgentDisconnectUpdate(Update):
-    def __init__(self, agent):
-        super(AgentDisconnectUpdate, self).__init__()
-
-        self.agent = agent
-
-    def do_process(self, conn, stats):
-        cursor = conn.cursor()
-
-        args = dict()
-
-        if self.agent:
-            args["qmf_agent_id"] = self.agent.id
-
-        op = SqlAgentDisconnect(self.agent)
-        op.execute(cursor, args)

Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/util.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -69,12 +69,16 @@
         return cls(brokerId, brokerBank, agentBank)
 
     def fromAgent(cls, agent):
+        return agent.getAgentBank()
+
         broker = agent.getBroker()
 
         brokerId = broker.getBrokerId()
         brokerBank = broker.getBrokerBank()
         agentBank = agent.getAgentBank()
 
+        print "XXX", brokerId, brokerBank, agentBank
+
         return cls(brokerId, brokerBank, agentBank)
 
     def fromString(cls, string):
@@ -90,16 +94,20 @@
     fromString = classmethod(fromString)
 
     def __str__(self):
-        return "%s.%i.%i" % (self.brokerId, self.brokerBank, self.agentBank)
+        return self.agentBank
 
 class QmfObjectId(object):
-    def __init__(self, first, second):
-        self.first = first
-        self.second = second
+    def __init__(self, id):
+        self.id = id
 
     def fromObject(cls, object):
         oid = object.getObjectId()
 
+        print "XXX", oid
+
+        for k, v in oid.__dict__.items():
+            print "  ", k, v
+
         return cls(oid.first, oid.second)
 
     def fromString(cls, string):

Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/mint/python/mint/vacuum.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -1,5 +1,4 @@
 from newupdate import *
-from update import *
 from util import *
 
 log = logging.getLogger("mint.vacuum")
@@ -10,30 +9,13 @@
             if self.stop_requested:
                 break
 
-            up = VacuumUpdate()
+            up = VacuumUpdate(self.app.model)
             self.app.update_thread.enqueue(up)
 
-            up = NewVacuumUpdate(self.app.model)
-            self.app.new_update_thread.enqueue(up)
-
             sleep(60 * 60 * 10)
 
 class VacuumUpdate(Update):
     def do_process(self, conn, stats):
-        log.info("Vacuuming")
-
-        conn.commit()
-
-        level = conn.isolation_level
-        conn.set_isolation_level(0)
-
-        cursor = conn.cursor()
-        cursor.execute("vacuum")
-
-        conn.set_isolation_level(level)
-
-class NewVacuumUpdate(NewUpdate):
-    def do_process(self, conn, stats):
         log.info("Vacumming tables")
 
         level = conn.isolation_level
@@ -57,4 +39,3 @@
                 log.debug("Database: %s", notice.replace("\n", " "))
         finally:
             cursor.close()
-

Modified: mgmt/newdata/rosemary/python/rosemary/model.py
===================================================================
--- mgmt/newdata/rosemary/python/rosemary/model.py	2010-04-22 15:03:17 UTC (rev 3924)
+++ mgmt/newdata/rosemary/python/rosemary/model.py	2010-04-23 18:36:37 UTC (rev 3925)
@@ -410,6 +410,9 @@
 
         self.sql_delete.execute(cursor, (), obj.__dict__)
 
+    def delete_selection(self, cursor, **kwargs):
+        pass # XXX
+
     def __repr__(self):
         args = (self.__class__.__name__, self._package._name, self._name)
         return "%s(%s,%s)" % args



More information about the rhmessaging-commits mailing list