Author: justi9
Date: 2010-10-01 15:51:28 -0400 (Fri, 01 Oct 2010)
New Revision: 4367
Modified:
mgmt/newdata/cumin/bin/cumin-data
mgmt/newdata/cumin/etc/cumin.conf
mgmt/newdata/cumin/python/cumin/config.py
mgmt/newdata/mint/python/mint/expire.py
mgmt/newdata/mint/python/mint/main.py
mgmt/newdata/mint/python/mint/util.py
mgmt/newdata/mint/python/mint/vacuum.py
Log:
Move the periodic expire operation out of the update queue, so it
won't block updates.
Also:
* Use a common periodic processing thread for expire and vacuum
* Add a vacuum interval config param
* The config file params weren't hooked up to expire and vacuum;
fixed that
* Change the default expire interval from 10 minutes to 1 hour
* Offset expire and vacuum timing so they don't typically run at the
same time
Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/bin/cumin-data 2010-10-01 19:51:28 UTC (rev 4367)
@@ -12,9 +12,9 @@
def main():
config = CuminConfig()
- values = config.parse()
+ values = config.parse().data
- parser = CuminOptionParser(values.data)
+ parser = CuminOptionParser(values)
parser.add_option("--print-stats", action="store_true")
parser.add_option("--print-events", type="int", default=0,
metavar="LEVEL")
@@ -27,15 +27,21 @@
broker_uris = [x.strip() for x in opts.brokers.split(",")]
mint = Mint(model_dir, broker_uris, opts.database)
+
mint.print_event_level = opts.print_events
+ mint.expire_thread.interval = values.expire_interval
+ mint.expire_thread.threshold = values.expire_threshold
+
+ mint.vacuum_thread.interval = values.vacuum_interval
+
mint.check()
mint.init()
- if values.data.packages:
+ if values.packages:
packages = list()
- for name in values.data.packages.split(","):
+ for name in values.packages.split(","):
name = name.strip()
try:
Modified: mgmt/newdata/cumin/etc/cumin.conf
===================================================================
--- mgmt/newdata/cumin/etc/cumin.conf 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/etc/cumin.conf 2010-10-01 19:51:28 UTC (rev 4367)
@@ -14,6 +14,7 @@
[data]
# log-file: $CUMIN_HOME/log/data.log
-# expire-interval: 600
+# expire-interval: 3600
# expire-threshold: 86400
+# vacuum-interval: 3600
# packages: [all]
Modified: mgmt/newdata/cumin/python/cumin/config.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/config.py 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/python/cumin/config.py 2010-10-01 19:51:28 UTC (rev 4367)
@@ -20,7 +20,7 @@
web = CuminConfigSection(self, "web")
web.log_file.default = os.path.join(self.home, "log",
"web.log")
- param = ConfigParameter(web, "update_interval", int)
+ param = ConfigParameter(web, "update-interval", int)
param.default = 10
param = ConfigParameter(web, "host", str)
@@ -35,15 +35,18 @@
data = CuminConfigSection(self, "data")
data.log_file.default = os.path.join(self.home, "log",
"data.log")
-
+
param = ConfigParameter(data, "packages", str)
param = ConfigParameter(data, "expire-interval", int)
- param.default = 600 # 10 minutes
+ param.default = 60 * 60 # 1 hour
param = ConfigParameter(data, "expire-threshold", int)
- param.default = 24 * 3600 # 1 day
+ param.default = 24 * 60 * 60 # 1 day
+ param = ConfigParameter(data, "vacuum-interval", int)
+ param.default = 60 * 60 # 1 hour
+
def parse(self):
paths = list()
Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/expire.py 2010-10-01 19:51:28 UTC (rev 4367)
@@ -1,72 +1,50 @@
from update import *
from util import *
-import mint
-
log = logging.getLogger("mint.expire")
-class ExpireThread(MintDaemonThread):
- def init(self):
- log.debug("Initializing %s", self)
+class ExpireThread(MintPeriodicProcessThread):
+ def __init__(self, app):
+ super(ExpireThread, self).__init__(app, 60 * 60)
- interval = self.app.expire_interval
- threshold = self.app.expire_threshold
+ self.threshold = 24 * 3600
- interval_out, interval_unit = convert_time_units(interval)
- threshold_out, threshold_unit = convert_time_units(threshold)
+ def init(self):
+ super(ExpireThread, self).init()
- args = (threshold_out, threshold_unit, interval_out, interval_unit)
+ log.debug("Expire interval is %i seconds", self.interval)
+ log.debug("Expire threshold is %i seconds", self.threshold)
- log.debug("Expiring database records older than %d %s, every %d %s" \
- % args)
-
def run(self):
- interval = self.app.expire_interval
+ time.sleep(0.25 * self.interval)
- while True:
- if self.stop_requested:
- break
+ super(ExpireThread, self).run()
- up = ExpireUpdate(self.app.model)
- self.app.update_thread.enqueue(up)
+ def process(self):
+ threshold = self.threshold
+ count = 0
- sleep(interval)
+ log.info("Starting expire")
-class ExpireUpdate(Update):
- def do_process(self, cursor, stats):
- seconds = self.model.app.expire_threshold
+ conn = self.app.database.get_connection()
- log.info("Expiring samples older than %i seconds", seconds)
+ try:
+ cursor = conn.cursor()
- count = 0
+ for pkg in self.app.model._packages:
+ for cls in pkg._classes:
+ if cls._storage == "none":
+ continue
- for pkg in self.model._packages:
- for cls in pkg._classes:
- if cls._storage == "none":
- continue
+ values = {"seconds": threshold}
+ cls.sql_samples_delete.execute(cursor, values)
- count += self.delete_samples(cursor, cls, seconds)
+ conn.commit()
- log.info("Expired %i samples", count)
+ log.debug("Deleted %i %s", cursor.rowcount, cls)
- def delete_samples(self, cursor, cls, seconds):
- cls.sql_samples_delete.execute(cursor, {"seconds": seconds})
+ count += cursor.rowcount
+ finally:
+ conn.close()
- log.debug("Deleted %i %s", cursor.rowcount, cls)
-
- return cursor.rowcount
-
-def convert_time_units(t):
- if t / (24 * 3600) >= 1:
- t_out = t / (24 * 3600)
- t_unit = "days"
- elif t / 3600 >= 1:
- t_out = t / 3600
- t_unit = "hours"
- elif t / 60 >= 1:
- t_out = t / 60
- t_unit = "minutes"
- else:
- t_out = t
- t_unit = "seconds"
- return (t_out, t_unit)
+ log.info("Expired %i samples", count)
Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/main.py 2010-10-01 19:51:28 UTC (rev 4367)
@@ -20,8 +20,6 @@
self.update_thread = UpdateThread(self)
self.expire_enabled = True
- self.expire_interval = 600
- self.expire_threshold = 24 * 3600
self.expire_thread = ExpireThread(self)
self.vacuum_enabled = True
Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/util.py 2010-10-01 19:51:28 UTC (rev 4367)
@@ -41,6 +41,35 @@
assert self.stop_requested is False
self.stop_requested = True
+class MintPeriodicProcessThread(MintDaemonThread):
+ def __init__(self, app, interval):
+ super(MintPeriodicProcessThread, self).__init__(app)
+
+ self.interval = interval
+
+ def run(self):
+ while True:
+ start = time.time()
+
+ try:
+ self.process()
+ except:
+ log.exception("Periodic process failed")
+
+ elapsed = time.time() - start
+ delta = self.interval - elapsed
+
+ if delta < 0:
+ delta = elapsed % interval
+
+ then = datetime.now() + timedelta(seconds=delta)
+ log.debug("Sleeping until %s",
then.strftime("%H:%M:%S"))
+
+ time.sleep(delta)
+
+ def process(self):
+ pass
+
def prompt_password():
password = None
Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py 2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/vacuum.py 2010-10-01 19:51:28 UTC (rev 4367)
@@ -3,36 +3,31 @@
log = logging.getLogger("mint.vacuum")
-class VacuumThread(MintDaemonThread):
- def run(self):
- while True:
- if self.stop_requested:
- break
+class VacuumThread(MintPeriodicProcessThread):
+ def __init__(self, app):
+ super(VacuumThread, self).__init__(app, 60 * 60)
- try:
- self.vacuum()
- except:
- log.exception("Vacuum failed")
+ def init(self):
+ super(VacuumThread, self).init()
- now = datetime.now()
- secs = 3600 - ((now.minute * 60) + now.second)
- then = now + timedelta(seconds=secs)
+ log.debug("Vacuum interval is %i seconds", self.interval)
- log.info("Next vacuum is at %s", then.strftime("%H:%M"))
+ def run(self):
+ time.sleep(30 * 60)
- sleep(secs + 1)
+ super(VacuumThread, self).run()
- def vacuum(self):
+ def process(self):
+ log.info("Starting vacuum")
+
conn = self.app.database.get_connection()
conn.set_isolation_level(0)
try:
cursor = conn.cursor()
- log.info("Starting vacuum")
-
cursor.execute("vacuum analyze")
-
- log.info("Finished vacuum")
finally:
conn.close()
+
+ log.info("Finished vacuum")