Author: justi9
Date: 2010-07-27 16:31:59 -0400 (Tue, 27 Jul 2010)
New Revision: 4154
Modified:
mgmt/newdata/mint/python/mint/model.py
mgmt/newdata/mint/python/mint/session.py
mgmt/newdata/mint/python/mint/update.py
Log:
* Add delete policies based on new agent state
* Change all agent callbacks into queued agent updates; this fixes
some data errors that arose from ordering problems
Modified: mgmt/newdata/mint/python/mint/model.py
===================================================================
--- mgmt/newdata/mint/python/mint/model.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/model.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,6 +1,5 @@
from rosemary.model import *
-from update import *
from util import *
log = logging.getLogger("mint.model")
Modified: mgmt/newdata/mint/python/mint/session.py
===================================================================
--- mgmt/newdata/mint/python/mint/session.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/session.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -1,4 +1,4 @@
-from model import *
+from update import *
from util import *
from qmf.console import Console, Session
@@ -73,36 +73,27 @@
def newAgent(self, qmf_agent):
self.model.print_event(3, "Creating %s", qmf_agent)
- MintAgent(self.model, qmf_agent)
+ up = AgentUpdate(self.model, qmf_agent)
+ self.model.app.update_thread.enqueue(up)
def delAgent(self, qmf_agent):
self.model.print_event(3, "Deleting %s", qmf_agent)
- try:
- agent = self.model.get_agent(qmf_agent)
- except KeyError:
- return
-
- agent.delete()
-
if not self.model.app.update_thread.isAlive():
return
- up = AgentDelete(self.model, agent)
+ up = AgentDelete(self.model, qmf_agent)
self.model.app.update_thread.enqueue(up)
def heartbeat(self, qmf_agent, timestamp):
message = "Heartbeat from %s at %s"
self.model.print_event(5, message, qmf_agent, timestamp)
- timestamp = timestamp / 1000000000
-
- try:
- agent = self.model.get_agent(qmf_agent)
- except KeyError:
+ if not self.model.app.update_thread.isAlive():
return
- agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+ up = AgentUpdate(self.model, qmf_agent)
+ self.model.app.update_thread.enqueue(up)
def newPackage(self, name):
self.model.print_event(2, "New package %s", name)
@@ -110,22 +101,18 @@
def newClass(self, kind, classKey):
self.model.print_event(2, "New class %s", classKey)
- def objectProps(self, broker, obj):
- agent = self.model.get_agent(obj.getAgent())
-
+ def objectProps(self, broker, qmf_object):
if not self.model.app.update_thread.isAlive():
return
- up = ObjectUpdate(self.model, agent, obj)
+ up = ObjectUpdate(self.model, qmf_object)
self.model.app.update_thread.enqueue(up)
- def objectStats(self, broker, obj):
- agent = self.model.get_agent(obj.getAgent())
-
+ def objectStats(self, broker, qmf_object):
if not self.model.app.update_thread.isAlive():
return
- up = ObjectUpdate(self.model, agent, obj)
+ up = ObjectUpdate(self.model, qmf_object)
self.model.app.update_thread.enqueue(up)
def event(self, broker, event):
Modified: mgmt/newdata/mint/python/mint/update.py
===================================================================
--- mgmt/newdata/mint/python/mint/update.py 2010-07-27 19:11:04 UTC (rev 4153)
+++ mgmt/newdata/mint/python/mint/update.py 2010-07-27 20:31:59 UTC (rev 4154)
@@ -5,6 +5,8 @@
from psycopg2 import IntegrityError, TimestampFromTicks
from psycopg2.extensions import cursor as Cursor
from rosemary.model import *
+
+from model import *
from util import *
log = logging.getLogger("mint.update")
@@ -156,11 +158,11 @@
thread.stats.errors += 1
+ #print_exc()
+
if thread.halt_on_error:
raise
- #print_exc()
-
def do_process(self, cursor, stats):
raise Exception("Not implemented")
@@ -168,13 +170,18 @@
return self.__class__.__name__
class ObjectUpdate(Update):
- def __init__(self, model, agent, object):
+ def __init__(self, model, qmf_object):
super(ObjectUpdate, self).__init__(model)
- self.agent = agent
- self.object = object
+ self.qmf_object = qmf_object
+ self.agent = None
def do_process(self, cursor, stats):
+ try:
+ self.agent = self.model.get_agent(self.qmf_object.getAgent())
+ except KeyError:
+ raise UpdateDropped()
+
cls = self.get_class()
obj_id = self.get_object_id()
obj = self.agent.get_object_by_id(obj_id)
@@ -185,14 +192,17 @@
except RosemaryNotFound:
pass
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
if obj:
if delete_time != 0:
self.delete_object(cursor, stats, obj)
return
- if not self.object.getProperties() and self.object.getStatistics():
+ properties = self.qmf_object.getProperties()
+ statistics = self.qmf_object.getStatistics()
+
+ if not properties and statistics:
# Just stats; do we want it?
# if stats.enqueued - stats.dequeued > 500:
@@ -210,7 +220,7 @@
self.update_object(cursor, stats, obj)
else:
- if not self.object.getProperties():
+ if not self.qmf_object.getProperties():
raise UpdateDropped()
if delete_time != 0:
@@ -223,7 +233,7 @@
self.agent.add_object(obj)
def get_class(self):
- class_key = self.object.getClassKey()
+ class_key = self.qmf_object.getClassKey()
name = class_key.getPackageName()
try:
@@ -241,17 +251,17 @@
return cls
def get_object_id(self):
- return self.object.getObjectId().objectName
+ return self.qmf_object.getObjectId().objectName
def create_object(self, cursor, stats, cls):
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
create_time = datetime.fromtimestamp(create_time / 1000000000)
update_time = datetime.fromtimestamp(update_time / 1000000000)
obj = cls.create_object(cursor)
obj._qmf_agent_id = self.agent.id
obj._qmf_object_id = self.get_object_id()
- obj._qmf_session_id = str(self.object.getObjectId().getSequence())
+ obj._qmf_session_id = str(self.qmf_object.getObjectId().getSequence())
obj._qmf_create_time = create_time
obj._qmf_update_time = update_time
@@ -294,7 +304,7 @@
return obj
def update_object(self, cursor, stats, obj):
- update_time, create_time, delete_time = self.object.getTimestamps()
+ update_time, create_time, delete_time = self.qmf_object.getTimestamps()
update_time = datetime.fromtimestamp(update_time / 1000000000)
obj._qmf_update_time = update_time
@@ -345,7 +355,7 @@
def process_properties(self, obj, columns, cursor):
cls = obj._class
- for prop, value in self.object.getProperties():
+ for prop, value in self.qmf_object.getProperties():
try:
if prop.type == 10:
col, nvalue = self.process_reference \
@@ -409,7 +419,7 @@
return col, value
def process_statistics(self, obj, update_columns, insert_columns):
- for stat, value in self.object.getStatistics():
+ for stat, value in self.qmf_object.getStatistics():
try:
col = obj._class._statistics_by_name[stat.name].sql_column
except KeyError:
@@ -452,43 +462,71 @@
log.error("Qmf properties:")
- for item in sorted(self.object.getProperties()):
+ for item in sorted(self.qmf_object.getProperties()):
log.error(" %-34s %r", *item)
log.error("Qmf statistics:")
- for item in sorted(self.object.getStatistics()):
+ for item in sorted(self.qmf_object.getStatistics()):
log.error(" %-34s %r", *item)
raise
def __repr__(self):
name = self.__class__.__name__
- cls = self.object.getClassKey().getClassName()
- id = self.object.getObjectId().objectName
+ cls = self.qmf_object.getClassKey().getClassName()
+ id = self.qmf_object.getObjectId().objectName
return "%s(%s,%s,%s)" % (name, self.agent.id, cls, id)
-class AgentDelete(Update):
- def __init__(self, model, agent):
- super(AgentDelete, self).__init__(model)
+class AgentUpdate(Update):
+ def __init__(self, model, qmf_agent):
+ super(AgentUpdate, self).__init__(model)
- self.agent = agent
+ self.qmf_agent = qmf_agent
def do_process(self, cursor, stats):
- id = self.agent.id
+ try:
+ agent = self.model.get_agent(self.qmf_agent)
+ except KeyError:
+ agent_id = self.qmf_agent.getAgentBank()
+ self.delete_agent_objects(cursor, stats, agent_id)
+
+ agent = MintAgent(self.model, self.qmf_agent)
+
+ #timestamp = timestamp / 1000000000
+ #agent.last_heartbeat = datetime.fromtimestamp(timestamp)
+
+ agent.last_heartbeat = datetime.now()
+
+ stats.updated += 1
+
+ # XXX Add periodic update of update_time
+
+ def delete_agent_objects(self, cursor, stats, agent_id):
for pkg in self.model._packages:
for cls in pkg._classes:
if cls._storage == "none":
continue
- for obj in cls.get_selection(cursor, _qmf_agent_id=id):
+ for obj in cls.get_selection(cursor, _qmf_agent_id=agent_id):
obj.delete(cursor)
self.model.print_event(3, "Deleted %s", obj)
stats.deleted += 1
+class AgentDelete(AgentUpdate):
+ def do_process(self, cursor, stats):
+ try:
+ agent = self.model.get_agent(qmf_agent)
+ except KeyError:
+ raise UpdateDropped()
+
+ agent.delete()
+
+ self.delete_agent_objects(cursor, stats, agent)
+
class UpdateDropped(Exception):
pass