[jboss-svn-commits] JBL Code SVN: r6082 - in labs/shotoku/trunk/shotoku-base/src: etc java/org/jboss/shotoku java/org/jboss/shotoku/cache java/org/jboss/shotoku/service java/org/jboss/shotoku/tools

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Sep 5 05:40:19 EDT 2006


Author: adamw
Date: 2006-09-05 05:40:13 -0400 (Tue, 05 Sep 2006)
New Revision: 6082

Added:
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java
Modified:
   labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java
   labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java
Log:
http://jira.jboss.com/jira/browse/JBSHOTOKU-99

Modified: labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample	2006-09-05 09:40:13 UTC (rev 6082)
@@ -44,6 +44,9 @@
 # Interval between service (and cache) updates, in milliseconds
 shotoku.service.interval = 10000
 
+# Number of created update threads (used for updating cache items)
+shotoku.updatethread.count = 10
+
 #
 # Feeds configuration
 # -------------------

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -450,6 +450,11 @@
     private static int defaultServiceInterval;
 
     /**
+     * Default update thread count.
+     */
+    private static int defaultUpdateThreadCount;
+
+    /**
      * Map (content manager impl class name -> content manager constructor).
      */
     private static Map<String, Constructor> contentManagers;
@@ -549,6 +554,9 @@
         defaultServiceInterval = props.getInt(
                 Constants.PROPERTIES_SERVICE_INTERVAL,
                 Constants.DEFAULT_SERVICE_INTERVAL);
+        defaultUpdateThreadCount = props.getInt(
+                Constants.PROPERTIES_UPDATETHREAD_COUNT,
+                Constants.DEFAULT_UPDATETHREAD_COUNT);
 
         // Getting ids of defined content managers.
         String[] ids = props.getStringArray(Constants.PROPERTIES_IDS);
@@ -813,4 +821,13 @@
     public static int getDefaultServiceInterval() {
         return defaultServiceInterval;
     }
+
+    /**
+     * Getst the default update thread count.
+     *
+     * @return Default update thread count.
+     */
+    public static int getDefaultUpdateThreadCount() {
+        return defaultUpdateThreadCount;
+    }
 }

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -27,6 +27,7 @@
 import org.jboss.shotoku.tools.ConcurrentHashSet;
 import org.jboss.shotoku.exceptions.CacheException;
 import org.jboss.shotoku.ContentManager;
+import org.jboss.shotoku.service.ShotokuService;
 
 import java.util.*;
 
@@ -44,10 +45,12 @@
 public abstract class ShotokuCacheItem<K, T> {
     private String keyBase;
     private ConcurrentSet<K> keys;
+    private ConcurrentSet<K> keysDuringUpdate;
     private final Object synchronizer = new Object();
     private final Map<K, ContentManager> cmForKeys = new HashMap<K, ContentManager>();
     private long lastUpdate;
     private long interval;
+    private ShotokuService service;
 
     /**
      *
@@ -61,8 +64,11 @@
 
         lastUpdate = 0;
         keys = new ConcurrentHashSet<K>();
+        keysDuringUpdate = new ConcurrentHashSet<K>();
 
-        Tools.getService().register(this);
+        service = Tools.getService();
+
+        service.register(this);
     }
 
     public ShotokuCacheItem() {
@@ -136,8 +142,19 @@
         long now = Calendar.getInstance().getTimeInMillis();
 
         if (now - lastUpdate >= interval) {
-            for (K key : keys) {
-                update(key, get(key));
+            for (final K key : keys) {
+                if (!keysDuringUpdate.add(key)) {
+                    service.acquireUpdateThread().setAndSignalData(new UpdateThreadData() {
+                        public void execute() {
+                            try {
+                                update(key, get(key));
+                                service.touch(createKey(key));
+                            } finally {
+                                keysDuringUpdate.remove(key);
+                            }
+                        }
+                    });
+                }
             }
 
             lastUpdate = now;
@@ -145,6 +162,16 @@
     }
 
     /**
+     * Notifies the cache item that it should reset its set of keys that
+     * are during update, most probably beacuse the managing service has
+     * been restarted (so updates of all keys that have been updated got
+     * interrupted).
+     */
+    public void resetKeysDuringUpdate() {
+        keysDuringUpdate.clear();
+    }
+
+    /**
      * Called by the service periodically to update the object held.
      * If the object in the cache should be changed, the implementing
      * method must call put(key, newObject).

Added: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -0,0 +1,66 @@
+package org.jboss.shotoku.cache;
+
+import org.jboss.shotoku.service.ShotokuService;
+import org.jboss.shotoku.service.AdministratedService;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author Adam Warski (adamw at aster.pl)
+ */
+public class UpdateThread extends Thread {
+    private static final Logger log =
+            Logger.getLogger(AdministratedService.class);
+
+    private ShotokuService service;
+    private BlockingQueue<UpdateThreadData> queue;
+    private boolean runnable;
+
+    public UpdateThread(ShotokuService service) {
+        this.service = service;
+
+        queue = new LinkedBlockingQueue<UpdateThreadData>();
+        runnable = true;
+
+        setDaemon(true);
+    }
+
+    public void setAndSignalData(UpdateThreadData data) {
+        queue.offer(data);
+    }
+
+    public void signalExit() {
+        setAndSignalData(new UpdateThreadData() {
+            public void execute() { runnable = false; }
+        });
+    }
+
+    public void run() {
+        while (true) {
+            UpdateThreadData data;
+            try {
+                data = queue.take();
+
+                try {
+                    data.execute();
+                } catch (Throwable t) {
+                    log.error("Exception while executing an update thread data.", t);
+                }
+
+                // Data may have been set and signaled just to notify the thread
+                // to exit.
+                if (!runnable) {
+                    break;
+                }
+            } catch (InterruptedException e) {
+                log.error("Update thread interrupted.", e);
+            }
+
+            service.releaseUpdateThread(this);
+        }
+
+        service.updateThreadExited(this);
+    }
+}

Added: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -0,0 +1,9 @@
+package org.jboss.shotoku.cache;
+
+/**
+ * Function that should be executed by an update thread.
+ * @author Adam Warski (adamw at aster.pl)
+ */
+public abstract class UpdateThreadData {
+    public abstract void execute();
+}

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -12,6 +12,9 @@
     public Date getLastUpdateDate();
     public void setLastUpdate(long lastUpdate);
 
+    public void setTimerInterval(long timerInterval);
+    public long getTimerInterval();
+
     public boolean getServiceRunnable();
     public void setServiceRunnable(boolean runnable);
 

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -15,7 +15,7 @@
             Logger.getLogger(AdministratedService.class);
 
     private long lastUpdate;
-    private long timerInterval;
+    private volatile long timerInterval;
     private boolean serviceRunnable;
 
     public long getLastUpdate() {
@@ -96,14 +96,16 @@
                     try {
                         sleep(getTimerInterval());
                     } catch (InterruptedException e) {
-                        log.error("Sleeping of updater thread interrupted.", e);
+                        log.error("Sleeping of updater thread for " +
+                                getServiceName() + " interrupted.", e);
                     }
 
                     try {
                         update();
                     } catch (Throwable t) {
                         // Making sure that an exception won't stop the thread.
-                        log.error("Update method threw an exception.", t);
+                        log.error("Update method for " + getServiceName() +
+                                " threw an exception.", t);
                     }
 
                     setLastUpdate(Calendar.getInstance().getTimeInMillis());

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -22,6 +22,7 @@
 package org.jboss.shotoku.service;
 
 import org.jboss.shotoku.cache.ShotokuCacheItem;
+import org.jboss.shotoku.cache.UpdateThread;
 
 import java.util.Set;
 
@@ -36,6 +37,12 @@
 
     public Object get(Object key);
     public void put(Object key, Object o);
+    /**
+     * Touch the last update time of a key, that is, notifies the
+     * service that the given key is up to date. 
+     * @param key
+     */
+    public void touch(Object key);
     public void register(ShotokuCacheItem cacheItem);
     public void remove(Object key);
 
@@ -44,6 +51,10 @@
      */
     public String getNextKeyBase();
 
+    public UpdateThread acquireUpdateThread();
+    public void releaseUpdateThread(UpdateThread ut);
+    public void updateThreadExited(UpdateThread ut);
+
     public void addAdministratedService(AdministratedServiceGetter asg);
     public Set<AdministratedServiceGetter> getAdministratedServices();
 }

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -27,6 +27,7 @@
 import org.jboss.annotation.ejb.Service;
 import org.jboss.shotoku.ContentManager;
 import org.jboss.shotoku.cache.ShotokuCacheItem;
+import org.jboss.shotoku.cache.UpdateThread;
 import org.jboss.shotoku.tools.Constants;
 import org.jboss.shotoku.tools.ConcurrentSet;
 import org.jboss.shotoku.tools.ConcurrentHashSet;
@@ -35,6 +36,7 @@
 
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.Set;
 
 /**
@@ -61,8 +63,8 @@
         ContentManager.setup();
 
         log.info("ContentManager setup completed.  Getting the default service timer interval...");
+        updateThreadCount = ContentManager.getDefaultUpdateThreadCount();
 
-        // Setting the default timer interval.
         setTimerInterval(ContentManager.getDefaultServiceInterval());
 
         super.afterCreate();
@@ -71,12 +73,30 @@
     public void start() throws Exception {
         super.start();
 
-        log.info("Starting update thread...");
+        log.info("Starting main update thread...");
+
         startUpdateThread();
 
+        log.info("Starting update threads...");
+        setUpdateThreadCount(updateThreadCount);
+
+        log.info("Reseting keys during update in cache items...");
+        // Reseting just in case - if any thread unexpectadly died.
+        for (ShotokuCacheItem sci : cacheItems) {
+            sci.resetKeysDuringUpdate();
+        }
+
         super.afterStart();
     }
 
+    public void stop() {
+        super.stop();
+
+        signalExitAllUpdateThreads();
+        toRemove = 0;
+        log.info("All update threads stopped.");
+    }
+
     public void destroy() {
 
     }
@@ -89,6 +109,8 @@
             new ConcurrentHashMap<Object, Object>();
     private final ConcurrentSet<ShotokuCacheItem> cacheItems =
             new ConcurrentHashSet<ShotokuCacheItem>();
+    private final ConcurrentMap<Object, Long> lastUpdates =
+            new ConcurrentHashMap<Object, Long>();
 
     public Object get(Object key) {
         Object ret = cache.get(key);
@@ -100,6 +122,8 @@
     }
 
     public void put(Object key, Object o) {
+        touch(key);
+
         if (o == null) {
             cache.put(key, Null.getInstance());
         } else {
@@ -107,6 +131,10 @@
         }
     }
 
+    public void touch(Object key) {
+        lastUpdates.put(key, System.currentTimeMillis());
+    }
+
     public void remove(Object key) {
         cache.remove(key);
     }
@@ -126,6 +154,77 @@
     }
 
     /*
+     * Update threads management.
+     */
+
+    private final ConcurrentSet<UpdateThread> allUpdateThreads =
+            new ConcurrentHashSet<UpdateThread>();
+    private final LinkedBlockingQueue<UpdateThread> updateThreads =
+            new LinkedBlockingQueue<UpdateThread>();
+    private int updateThreadCount;
+    /**
+     * Number of threads that should be removed, instead of
+     */
+    private volatile int toRemove = 0;
+
+    public int getUpdateThreadsCount() {
+        return updateThreads.size();
+    }
+
+    public void setUpdateThreadCount(int n) {
+        synchronized (updateThreads) {
+            int current = updateThreads.size();
+
+            if (current < n) {
+                for (int i=current; i<n; i++) {
+                    UpdateThread ut = new UpdateThread(this);
+                    updateThreads.offer(ut);
+                    allUpdateThreads.add(ut);
+                    ut.start();
+                }
+            } else if (n < current) {
+                toRemove += current - n;
+            }
+        }
+
+        log.info("Update thread count set to: " + n + ".");
+        updateThreadCount = n;
+    }
+
+    public UpdateThread acquireUpdateThread() {
+        try {
+            return updateThreads.take();
+        } catch (InterruptedException e) {
+            log.error("Error while acquireing an update thread (interrupted).", e);
+            return null;
+        }
+    }
+
+    public void releaseUpdateThread(UpdateThread ut) {
+        if (toRemove != 0) {
+            synchronized (updateThreads) {
+                if (toRemove != 0) {
+                    ut.signalExit();
+                    toRemove--;
+                    return;
+                }
+            }
+        }
+
+        updateThreads.offer(ut);
+    }
+
+    public void updateThreadExited(UpdateThread ut) {
+        allUpdateThreads.remove(ut);
+    }
+
+    private void signalExitAllUpdateThreads() {
+        for (UpdateThread ut : allUpdateThreads) {
+            ut.signalExit();
+        }
+    }
+
+    /*
      * Update function.
      */
 
@@ -133,8 +232,8 @@
         for (ShotokuCacheItem sci : cacheItems) {
             try {
                 sci.update();
-            } catch (Exception e) {
-                log.error("Exception while updating a cache item.", e);
+            } catch (Throwable t) {
+                log.error("Exception while updating a cache item.", t);
             }
         }
     }
@@ -156,13 +255,17 @@
     }
 
     public String getServiceDescription() {
+        long now = System.currentTimeMillis();
+
         StringBuffer sb = new StringBuffer("Cache service.<br />");
         sb.append("Currently storing ").append(cache.size()).append(" items in the cache ");
         sb.append("and ").append(cacheItems.size()).append(" ShotokuCacheItem objects.<br />");
         sb.append("Objects in cache:<br />");
         for (Object key : cache.keySet()) {
             sb.append(key.toString()).append(" : ").append(
-                    cache.get(key).getClass().getName()).append("<br />");
+                    cache.get(key).getClass().getName()).append(
+                    ", last updated ").append((now - lastUpdates.get(key)) / 1000).append(
+                    " seconds ago.<br />");
         }
 
         sb.append("Cache items:<br />");

Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java	2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java	2006-09-05 09:40:13 UTC (rev 6082)
@@ -36,8 +36,9 @@
 	public static final String PROPERTIES_EMBEDDED = PROPERTIES_PREFIX + ".embedded";
 	public static final String PROPERTIES_TRANSFER_BUF_SIZE = PROPERTIES_PREFIX + ".transfer.buffer.size";
 	public static final String PROPERTIES_SERVICE_INTERVAL = PROPERTIES_PREFIX + ".service.interval";
-	
-	private static final String VELOCITY_RL = "shotoku.resource.loader";
+    public static final String PROPERTIES_UPDATETHREAD_COUNT = PROPERTIES_PREFIX + ".updatethread.count";
+
+    private static final String VELOCITY_RL = "shotoku.resource.loader";
 	public static final String VELOCITY_RL_PREFIX = VELOCITY_RL + ".prefix";
 	public static final String VELOCITY_RL_ID = VELOCITY_RL + ".id";
 	public static final String VELOCITY_PROPERTIES_FILE = "/velocity.properties";
@@ -50,8 +51,9 @@
 	public static final String DEFAULT_ID 				= "default";
 	public static final int DEFAULT_TRANSFER_BUF_SIZE	= 1024;
 	public static final int DEFAULT_SERVICE_INTERVAL	= 5000;
-	
-	/**
+    public static final int DEFAULT_UPDATETHREAD_COUNT  = 10;
+
+    /**
 	 * <code>SHOTOKU_SERVICE_NAME</code> - name under which the shotoku service is
 	 * registered.
 	 */




More information about the jboss-svn-commits mailing list