[jboss-svn-commits] JBL Code SVN: r6082 - in labs/shotoku/trunk/shotoku-base/src: etc java/org/jboss/shotoku java/org/jboss/shotoku/cache java/org/jboss/shotoku/service java/org/jboss/shotoku/tools
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Sep 5 05:40:19 EDT 2006
Author: adamw
Date: 2006-09-05 05:40:13 -0400 (Tue, 05 Sep 2006)
New Revision: 6082
Added:
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java
Modified:
labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java
labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java
Log:
http://jira.jboss.com/jira/browse/JBSHOTOKU-99
Modified: labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/etc/shotoku.properties.sample 2006-09-05 09:40:13 UTC (rev 6082)
@@ -44,6 +44,9 @@
# Interval between service (and cache) updates, in milliseconds
shotoku.service.interval = 10000
+# Number of created update threads (used for updating cache items)
+shotoku.updatethread.count = 10
+
#
# Feeds configuration
# -------------------
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/ContentManager.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -450,6 +450,11 @@
private static int defaultServiceInterval;
/**
+ * Default update thread count.
+ */
+ private static int defaultUpdateThreadCount;
+
+ /**
* Map (content manager impl class name -> content manager constructor).
*/
private static Map<String, Constructor> contentManagers;
@@ -549,6 +554,9 @@
defaultServiceInterval = props.getInt(
Constants.PROPERTIES_SERVICE_INTERVAL,
Constants.DEFAULT_SERVICE_INTERVAL);
+ defaultUpdateThreadCount = props.getInt(
+ Constants.PROPERTIES_UPDATETHREAD_COUNT,
+ Constants.DEFAULT_UPDATETHREAD_COUNT);
// Getting ids of defined content managers.
String[] ids = props.getStringArray(Constants.PROPERTIES_IDS);
@@ -813,4 +821,13 @@
public static int getDefaultServiceInterval() {
return defaultServiceInterval;
}
+
+ /**
+ * Getst the default update thread count.
+ *
+ * @return Default update thread count.
+ */
+ public static int getDefaultUpdateThreadCount() {
+ return defaultUpdateThreadCount;
+ }
}
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/ShotokuCacheItem.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -27,6 +27,7 @@
import org.jboss.shotoku.tools.ConcurrentHashSet;
import org.jboss.shotoku.exceptions.CacheException;
import org.jboss.shotoku.ContentManager;
+import org.jboss.shotoku.service.ShotokuService;
import java.util.*;
@@ -44,10 +45,12 @@
public abstract class ShotokuCacheItem<K, T> {
private String keyBase;
private ConcurrentSet<K> keys;
+ private ConcurrentSet<K> keysDuringUpdate;
private final Object synchronizer = new Object();
private final Map<K, ContentManager> cmForKeys = new HashMap<K, ContentManager>();
private long lastUpdate;
private long interval;
+ private ShotokuService service;
/**
*
@@ -61,8 +64,11 @@
lastUpdate = 0;
keys = new ConcurrentHashSet<K>();
+ keysDuringUpdate = new ConcurrentHashSet<K>();
- Tools.getService().register(this);
+ service = Tools.getService();
+
+ service.register(this);
}
public ShotokuCacheItem() {
@@ -136,8 +142,19 @@
long now = Calendar.getInstance().getTimeInMillis();
if (now - lastUpdate >= interval) {
- for (K key : keys) {
- update(key, get(key));
+ for (final K key : keys) {
+ if (!keysDuringUpdate.add(key)) {
+ service.acquireUpdateThread().setAndSignalData(new UpdateThreadData() {
+ public void execute() {
+ try {
+ update(key, get(key));
+ service.touch(createKey(key));
+ } finally {
+ keysDuringUpdate.remove(key);
+ }
+ }
+ });
+ }
}
lastUpdate = now;
@@ -145,6 +162,16 @@
}
/**
+ * Notifies the cache item that it should reset its set of keys that
+ * are during update, most probably beacuse the managing service has
+ * been restarted (so updates of all keys that have been updated got
+ * interrupted).
+ */
+ public void resetKeysDuringUpdate() {
+ keysDuringUpdate.clear();
+ }
+
+ /**
* Called by the service periodically to update the object held.
* If the object in the cache should be changed, the implementing
* method must call put(key, newObject).
Added: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThread.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -0,0 +1,66 @@
+package org.jboss.shotoku.cache;
+
+import org.jboss.shotoku.service.ShotokuService;
+import org.jboss.shotoku.service.AdministratedService;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * @author Adam Warski (adamw at aster.pl)
+ */
+public class UpdateThread extends Thread {
+ private static final Logger log =
+ Logger.getLogger(AdministratedService.class);
+
+ private ShotokuService service;
+ private BlockingQueue<UpdateThreadData> queue;
+ private boolean runnable;
+
+ public UpdateThread(ShotokuService service) {
+ this.service = service;
+
+ queue = new LinkedBlockingQueue<UpdateThreadData>();
+ runnable = true;
+
+ setDaemon(true);
+ }
+
+ public void setAndSignalData(UpdateThreadData data) {
+ queue.offer(data);
+ }
+
+ public void signalExit() {
+ setAndSignalData(new UpdateThreadData() {
+ public void execute() { runnable = false; }
+ });
+ }
+
+ public void run() {
+ while (true) {
+ UpdateThreadData data;
+ try {
+ data = queue.take();
+
+ try {
+ data.execute();
+ } catch (Throwable t) {
+ log.error("Exception while executing an update thread data.", t);
+ }
+
+ // Data may have been set and signaled just to notify the thread
+ // to exit.
+ if (!runnable) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ log.error("Update thread interrupted.", e);
+ }
+
+ service.releaseUpdateThread(this);
+ }
+
+ service.updateThreadExited(this);
+ }
+}
Added: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/cache/UpdateThreadData.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -0,0 +1,9 @@
+package org.jboss.shotoku.cache;
+
+/**
+ * Function that should be executed by an update thread.
+ * @author Adam Warski (adamw at aster.pl)
+ */
+public abstract class UpdateThreadData {
+ public abstract void execute();
+}
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedService.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -12,6 +12,9 @@
public Date getLastUpdateDate();
public void setLastUpdate(long lastUpdate);
+ public void setTimerInterval(long timerInterval);
+ public long getTimerInterval();
+
public boolean getServiceRunnable();
public void setServiceRunnable(boolean runnable);
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/AdministratedServiceImpl.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -15,7 +15,7 @@
Logger.getLogger(AdministratedService.class);
private long lastUpdate;
- private long timerInterval;
+ private volatile long timerInterval;
private boolean serviceRunnable;
public long getLastUpdate() {
@@ -96,14 +96,16 @@
try {
sleep(getTimerInterval());
} catch (InterruptedException e) {
- log.error("Sleeping of updater thread interrupted.", e);
+ log.error("Sleeping of updater thread for " +
+ getServiceName() + " interrupted.", e);
}
try {
update();
} catch (Throwable t) {
// Making sure that an exception won't stop the thread.
- log.error("Update method threw an exception.", t);
+ log.error("Update method for " + getServiceName() +
+ " threw an exception.", t);
}
setLastUpdate(Calendar.getInstance().getTimeInMillis());
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuService.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -22,6 +22,7 @@
package org.jboss.shotoku.service;
import org.jboss.shotoku.cache.ShotokuCacheItem;
+import org.jboss.shotoku.cache.UpdateThread;
import java.util.Set;
@@ -36,6 +37,12 @@
public Object get(Object key);
public void put(Object key, Object o);
+ /**
+ * Touch the last update time of a key, that is, notifies the
+ * service that the given key is up to date.
+ * @param key
+ */
+ public void touch(Object key);
public void register(ShotokuCacheItem cacheItem);
public void remove(Object key);
@@ -44,6 +51,10 @@
*/
public String getNextKeyBase();
+ public UpdateThread acquireUpdateThread();
+ public void releaseUpdateThread(UpdateThread ut);
+ public void updateThreadExited(UpdateThread ut);
+
public void addAdministratedService(AdministratedServiceGetter asg);
public Set<AdministratedServiceGetter> getAdministratedServices();
}
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/service/ShotokuServiceImpl.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -27,6 +27,7 @@
import org.jboss.annotation.ejb.Service;
import org.jboss.shotoku.ContentManager;
import org.jboss.shotoku.cache.ShotokuCacheItem;
+import org.jboss.shotoku.cache.UpdateThread;
import org.jboss.shotoku.tools.Constants;
import org.jboss.shotoku.tools.ConcurrentSet;
import org.jboss.shotoku.tools.ConcurrentHashSet;
@@ -35,6 +36,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.Set;
/**
@@ -61,8 +63,8 @@
ContentManager.setup();
log.info("ContentManager setup completed. Getting the default service timer interval...");
+ updateThreadCount = ContentManager.getDefaultUpdateThreadCount();
- // Setting the default timer interval.
setTimerInterval(ContentManager.getDefaultServiceInterval());
super.afterCreate();
@@ -71,12 +73,30 @@
public void start() throws Exception {
super.start();
- log.info("Starting update thread...");
+ log.info("Starting main update thread...");
+
startUpdateThread();
+ log.info("Starting update threads...");
+ setUpdateThreadCount(updateThreadCount);
+
+ log.info("Reseting keys during update in cache items...");
+ // Reseting just in case - if any thread unexpectadly died.
+ for (ShotokuCacheItem sci : cacheItems) {
+ sci.resetKeysDuringUpdate();
+ }
+
super.afterStart();
}
+ public void stop() {
+ super.stop();
+
+ signalExitAllUpdateThreads();
+ toRemove = 0;
+ log.info("All update threads stopped.");
+ }
+
public void destroy() {
}
@@ -89,6 +109,8 @@
new ConcurrentHashMap<Object, Object>();
private final ConcurrentSet<ShotokuCacheItem> cacheItems =
new ConcurrentHashSet<ShotokuCacheItem>();
+ private final ConcurrentMap<Object, Long> lastUpdates =
+ new ConcurrentHashMap<Object, Long>();
public Object get(Object key) {
Object ret = cache.get(key);
@@ -100,6 +122,8 @@
}
public void put(Object key, Object o) {
+ touch(key);
+
if (o == null) {
cache.put(key, Null.getInstance());
} else {
@@ -107,6 +131,10 @@
}
}
+ public void touch(Object key) {
+ lastUpdates.put(key, System.currentTimeMillis());
+ }
+
public void remove(Object key) {
cache.remove(key);
}
@@ -126,6 +154,77 @@
}
/*
+ * Update threads management.
+ */
+
+ private final ConcurrentSet<UpdateThread> allUpdateThreads =
+ new ConcurrentHashSet<UpdateThread>();
+ private final LinkedBlockingQueue<UpdateThread> updateThreads =
+ new LinkedBlockingQueue<UpdateThread>();
+ private int updateThreadCount;
+ /**
+ * Number of threads that should be removed, instead of
+ */
+ private volatile int toRemove = 0;
+
+ public int getUpdateThreadsCount() {
+ return updateThreads.size();
+ }
+
+ public void setUpdateThreadCount(int n) {
+ synchronized (updateThreads) {
+ int current = updateThreads.size();
+
+ if (current < n) {
+ for (int i=current; i<n; i++) {
+ UpdateThread ut = new UpdateThread(this);
+ updateThreads.offer(ut);
+ allUpdateThreads.add(ut);
+ ut.start();
+ }
+ } else if (n < current) {
+ toRemove += current - n;
+ }
+ }
+
+ log.info("Update thread count set to: " + n + ".");
+ updateThreadCount = n;
+ }
+
+ public UpdateThread acquireUpdateThread() {
+ try {
+ return updateThreads.take();
+ } catch (InterruptedException e) {
+ log.error("Error while acquireing an update thread (interrupted).", e);
+ return null;
+ }
+ }
+
+ public void releaseUpdateThread(UpdateThread ut) {
+ if (toRemove != 0) {
+ synchronized (updateThreads) {
+ if (toRemove != 0) {
+ ut.signalExit();
+ toRemove--;
+ return;
+ }
+ }
+ }
+
+ updateThreads.offer(ut);
+ }
+
+ public void updateThreadExited(UpdateThread ut) {
+ allUpdateThreads.remove(ut);
+ }
+
+ private void signalExitAllUpdateThreads() {
+ for (UpdateThread ut : allUpdateThreads) {
+ ut.signalExit();
+ }
+ }
+
+ /*
* Update function.
*/
@@ -133,8 +232,8 @@
for (ShotokuCacheItem sci : cacheItems) {
try {
sci.update();
- } catch (Exception e) {
- log.error("Exception while updating a cache item.", e);
+ } catch (Throwable t) {
+ log.error("Exception while updating a cache item.", t);
}
}
}
@@ -156,13 +255,17 @@
}
public String getServiceDescription() {
+ long now = System.currentTimeMillis();
+
StringBuffer sb = new StringBuffer("Cache service.<br />");
sb.append("Currently storing ").append(cache.size()).append(" items in the cache ");
sb.append("and ").append(cacheItems.size()).append(" ShotokuCacheItem objects.<br />");
sb.append("Objects in cache:<br />");
for (Object key : cache.keySet()) {
sb.append(key.toString()).append(" : ").append(
- cache.get(key).getClass().getName()).append("<br />");
+ cache.get(key).getClass().getName()).append(
+ ", last updated ").append((now - lastUpdates.get(key)) / 1000).append(
+ " seconds ago.<br />");
}
sb.append("Cache items:<br />");
Modified: labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java
===================================================================
--- labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java 2006-09-05 09:14:26 UTC (rev 6081)
+++ labs/shotoku/trunk/shotoku-base/src/java/org/jboss/shotoku/tools/Constants.java 2006-09-05 09:40:13 UTC (rev 6082)
@@ -36,8 +36,9 @@
public static final String PROPERTIES_EMBEDDED = PROPERTIES_PREFIX + ".embedded";
public static final String PROPERTIES_TRANSFER_BUF_SIZE = PROPERTIES_PREFIX + ".transfer.buffer.size";
public static final String PROPERTIES_SERVICE_INTERVAL = PROPERTIES_PREFIX + ".service.interval";
-
- private static final String VELOCITY_RL = "shotoku.resource.loader";
+ public static final String PROPERTIES_UPDATETHREAD_COUNT = PROPERTIES_PREFIX + ".updatethread.count";
+
+ private static final String VELOCITY_RL = "shotoku.resource.loader";
public static final String VELOCITY_RL_PREFIX = VELOCITY_RL + ".prefix";
public static final String VELOCITY_RL_ID = VELOCITY_RL + ".id";
public static final String VELOCITY_PROPERTIES_FILE = "/velocity.properties";
@@ -50,8 +51,9 @@
public static final String DEFAULT_ID = "default";
public static final int DEFAULT_TRANSFER_BUF_SIZE = 1024;
public static final int DEFAULT_SERVICE_INTERVAL = 5000;
-
- /**
+ public static final int DEFAULT_UPDATETHREAD_COUNT = 10;
+
+ /**
* <code>SHOTOKU_SERVICE_NAME</code> - name under which the shotoku service is
* registered.
*/
More information about the jboss-svn-commits
mailing list