[rhmessaging-commits] rhmessaging commits: r4367 - in mgmt/newdata: cumin/etc and 2 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Oct 1 15:51:28 EDT 2010


Author: justi9
Date: 2010-10-01 15:51:28 -0400 (Fri, 01 Oct 2010)
New Revision: 4367

Modified:
   mgmt/newdata/cumin/bin/cumin-data
   mgmt/newdata/cumin/etc/cumin.conf
   mgmt/newdata/cumin/python/cumin/config.py
   mgmt/newdata/mint/python/mint/expire.py
   mgmt/newdata/mint/python/mint/main.py
   mgmt/newdata/mint/python/mint/util.py
   mgmt/newdata/mint/python/mint/vacuum.py
Log:
Move the periodic expire operation out of the update queue, so it
won't block updates.

Also:

 * Use a common periodic processing thread for expire and vacuum

 * Add a vacuum interval config param

 * The config file params weren't hooked up to expire and vacuum;
   fixed that

 * Change the default expire interval from 10 minutes to 1 hour

 * Offset expire and vacuum timing so they don't typically run at the
   same time


Modified: mgmt/newdata/cumin/bin/cumin-data
===================================================================
--- mgmt/newdata/cumin/bin/cumin-data	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/bin/cumin-data	2010-10-01 19:51:28 UTC (rev 4367)
@@ -12,9 +12,9 @@
 
 def main():
     config = CuminConfig()
-    values = config.parse()
+    values = config.parse().data
 
-    parser = CuminOptionParser(values.data)
+    parser = CuminOptionParser(values)
     parser.add_option("--print-stats", action="store_true")
     parser.add_option("--print-events", type="int", default=0, metavar="LEVEL")
 
@@ -27,15 +27,21 @@
     broker_uris = [x.strip() for x in opts.brokers.split(",")]
 
     mint = Mint(model_dir, broker_uris, opts.database)
+
     mint.print_event_level = opts.print_events
 
+    mint.expire_thread.interval = values.expire_interval
+    mint.expire_thread.threshold = values.expire_threshold
+
+    mint.vacuum_thread.interval = values.vacuum_interval
+
     mint.check()
     mint.init()
 
-    if values.data.packages:
+    if values.packages:
         packages = list()
 
-        for name in values.data.packages.split(","):
+        for name in values.packages.split(","):
             name = name.strip()
 
             try:

Modified: mgmt/newdata/cumin/etc/cumin.conf
===================================================================
--- mgmt/newdata/cumin/etc/cumin.conf	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/etc/cumin.conf	2010-10-01 19:51:28 UTC (rev 4367)
@@ -14,6 +14,7 @@
 
 [data]
 # log-file: $CUMIN_HOME/log/data.log
-# expire-interval: 600
+# expire-interval: 3600
 # expire-threshold: 86400
+# vacuum-interval: 3600
 # packages: [all]

Modified: mgmt/newdata/cumin/python/cumin/config.py
===================================================================
--- mgmt/newdata/cumin/python/cumin/config.py	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/cumin/python/cumin/config.py	2010-10-01 19:51:28 UTC (rev 4367)
@@ -20,7 +20,7 @@
         web = CuminConfigSection(self, "web")
         web.log_file.default = os.path.join(self.home, "log", "web.log")
 
-        param = ConfigParameter(web, "update_interval", int)
+        param = ConfigParameter(web, "update-interval", int)
         param.default = 10
 
         param = ConfigParameter(web, "host", str)
@@ -35,15 +35,18 @@
 
         data = CuminConfigSection(self, "data")
         data.log_file.default = os.path.join(self.home, "log", "data.log")
-
+        
         param = ConfigParameter(data, "packages", str)
 
         param = ConfigParameter(data, "expire-interval", int)
-        param.default = 600 # 10 minutes
+        param.default = 60 * 60 # 1 hour
 
         param = ConfigParameter(data, "expire-threshold", int)
-        param.default = 24 * 3600 # 1 day
+        param.default = 24 * 60 * 60 # 1 day
 
+        param = ConfigParameter(data, "vacuum-interval", int)
+        param.default = 60 * 60 # 1 hour
+
     def parse(self):
         paths = list()
 

Modified: mgmt/newdata/mint/python/mint/expire.py
===================================================================
--- mgmt/newdata/mint/python/mint/expire.py	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/expire.py	2010-10-01 19:51:28 UTC (rev 4367)
@@ -1,72 +1,50 @@
 from update import *
 from util import *
 
-import mint
-
 log = logging.getLogger("mint.expire")
 
-class ExpireThread(MintDaemonThread):
-    def init(self):
-        log.debug("Initializing %s", self)
+class ExpireThread(MintPeriodicProcessThread):
+    def __init__(self, app):
+        super(ExpireThread, self).__init__(app, 60 * 60)
 
-        interval = self.app.expire_interval
-        threshold = self.app.expire_threshold
+        self.threshold = 24 * 3600
 
-        interval_out, interval_unit = convert_time_units(interval)
-        threshold_out, threshold_unit = convert_time_units(threshold)
+    def init(self):
+        super(ExpireThread, self).init()
 
-        args = (threshold_out, threshold_unit, interval_out, interval_unit)
+        log.debug("Expire interval is %i seconds", self.interval)
+        log.debug("Expire threshold is %i seconds", self.threshold)
 
-        log.debug("Expiring database records older than %d %s, every %d %s" \
-                    % args)
-
     def run(self):
-        interval = self.app.expire_interval
+        time.sleep(0.25 * self.interval)
 
-        while True:
-            if self.stop_requested:
-                break
+        super(ExpireThread, self).run()
 
-            up = ExpireUpdate(self.app.model)
-            self.app.update_thread.enqueue(up)
+    def process(self):
+        threshold = self.threshold
+        count = 0
 
-            sleep(interval)
+        log.info("Starting expire")
 
-class ExpireUpdate(Update):
-    def do_process(self, cursor, stats):
-        seconds = self.model.app.expire_threshold
+        conn = self.app.database.get_connection()
 
-        log.info("Expiring samples older than %i seconds", seconds)
+        try:
+            cursor = conn.cursor()
 
-        count = 0
+            for pkg in self.app.model._packages:
+                for cls in pkg._classes:
+                    if cls._storage == "none":
+                        continue
 
-        for pkg in self.model._packages:
-            for cls in pkg._classes:
-                if cls._storage == "none":
-                    continue
+                    values = {"seconds": threshold}
+                    cls.sql_samples_delete.execute(cursor, values)
 
-                count += self.delete_samples(cursor, cls, seconds)
+                    conn.commit()
 
-        log.info("Expired %i samples", count)
+                    log.debug("Deleted %i %s", cursor.rowcount, cls)
 
-    def delete_samples(self, cursor, cls, seconds):
-        cls.sql_samples_delete.execute(cursor, {"seconds": seconds})
+                    count += cursor.rowcount
+        finally:
+            conn.close()
 
-        log.debug("Deleted %i %s", cursor.rowcount, cls)
-
-        return cursor.rowcount
-
-def convert_time_units(t):
-    if t / (24 * 3600) >= 1:
-        t_out = t / (24 * 3600)
-        t_unit = "days"
-    elif t / 3600 >= 1:
-        t_out = t / 3600
-        t_unit = "hours"
-    elif t / 60 >= 1:
-        t_out = t / 60
-        t_unit = "minutes"
-    else:
-        t_out = t
-        t_unit = "seconds"
-    return (t_out, t_unit)
+        log.info("Expired %i samples", count)

Modified: mgmt/newdata/mint/python/mint/main.py
===================================================================
--- mgmt/newdata/mint/python/mint/main.py	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/main.py	2010-10-01 19:51:28 UTC (rev 4367)
@@ -20,8 +20,6 @@
         self.update_thread = UpdateThread(self)
 
         self.expire_enabled = True
-        self.expire_interval = 600
-        self.expire_threshold = 24 * 3600
         self.expire_thread = ExpireThread(self)
 
         self.vacuum_enabled = True

Modified: mgmt/newdata/mint/python/mint/util.py
===================================================================
--- mgmt/newdata/mint/python/mint/util.py	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/util.py	2010-10-01 19:51:28 UTC (rev 4367)
@@ -41,6 +41,35 @@
         assert self.stop_requested is False
         self.stop_requested = True
 
+class MintPeriodicProcessThread(MintDaemonThread):
+    def __init__(self, app, interval):
+        super(MintPeriodicProcessThread, self).__init__(app)
+        
+        self.interval = interval
+
+    def run(self):
+        while True:
+            start = time.time()
+
+            try:
+                self.process()
+            except:
+                log.exception("Periodic process failed")
+
+            elapsed = time.time() - start
+            delta = self.interval - elapsed
+
+            if delta < 0:
+                delta = elapsed % interval
+
+            then = datetime.now() + timedelta(seconds=delta)
+            log.debug("Sleeping until %s", then.strftime("%H:%M:%S"))
+
+            time.sleep(delta)
+
+    def process(self):
+        pass
+
 def prompt_password():
     password = None
 

Modified: mgmt/newdata/mint/python/mint/vacuum.py
===================================================================
--- mgmt/newdata/mint/python/mint/vacuum.py	2010-10-01 17:49:05 UTC (rev 4366)
+++ mgmt/newdata/mint/python/mint/vacuum.py	2010-10-01 19:51:28 UTC (rev 4367)
@@ -3,36 +3,31 @@
 
 log = logging.getLogger("mint.vacuum")
 
-class VacuumThread(MintDaemonThread):
-    def run(self):
-        while True:
-            if self.stop_requested:
-                break
+class VacuumThread(MintPeriodicProcessThread):
+    def __init__(self, app):
+        super(VacuumThread, self).__init__(app, 60 * 60)
 
-            try:
-                self.vacuum()
-            except:
-                log.exception("Vacuum failed")
+    def init(self):
+        super(VacuumThread, self).init()
 
-            now = datetime.now()
-            secs = 3600 - ((now.minute * 60) + now.second)
-            then = now + timedelta(seconds=secs)
+        log.debug("Vacuum interval is %i seconds", self.interval)
 
-            log.info("Next vacuum is at %s", then.strftime("%H:%M"))
+    def run(self):
+        time.sleep(30 * 60)
 
-            sleep(secs + 1)
+        super(VacuumThread, self).run()
 
-    def vacuum(self):
+    def process(self):
+        log.info("Starting vacuum")
+
         conn = self.app.database.get_connection()
         conn.set_isolation_level(0)
 
         try:
             cursor = conn.cursor()
 
-            log.info("Starting vacuum")
-
             cursor.execute("vacuum analyze")
-
-            log.info("Finished vacuum")
         finally:
             conn.close()
+
+        log.info("Finished vacuum")



More information about the rhmessaging-commits mailing list