Author: manik.surtani(a)jboss.com
Date: 2008-08-04 09:27:06 -0400 (Mon, 04 Aug 2008)
New Revision: 6493
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
Log:
JBCACHE-1108 Move CacheListeners to their own thread pool, optionally enabled using an
attribute on @CacheListener
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 12:49:34
UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 13:27:06
UTC (rev 6493)
@@ -204,7 +204,7 @@
private List<CustomInterceptorConfig> customInterceptors =
Collections.emptyList();
private boolean writeSkewCheck = false;
private int concurrencyLevel = 500;
- private int listenerAsyncPoolSize = 0;
+ private int listenerAsyncPoolSize = 1;
@Start(priority = 1)
private void correctIsolationLevels()
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-08-04
12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-08-04
13:27:06 UTC (rev 6493)
@@ -25,6 +25,7 @@
import org.jboss.cache.notifications.event.*;
import static org.jboss.cache.notifications.event.Event.Type.*;
import org.jboss.cache.util.ImmutableMapCopy;
+import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.View;
import javax.transaction.Transaction;
@@ -39,6 +40,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Helper class that handles all notifications to registered listeners.
@@ -90,6 +95,10 @@
private Cache cache;
private boolean useMarshalledValueMaps;
private Configuration config;
+ // two separate executor services, one for sync and one for async listeners
+ private ExecutorService syncProcessor;
+ private ExecutorService asyncProcessor;
+ private static final AtomicInteger asyncNotifierThreadNumber = new AtomicInteger(0);
public NotifierImpl()
{
@@ -126,6 +135,13 @@
this.config = config;
}
+ @Stop
+ private void stop()
+ {
+ syncProcessor.shutdownNow();
+ asyncProcessor.shutdownNow();
+ }
+
@Destroy
protected void destroy()
{
@@ -136,6 +152,22 @@
protected void start()
{
useMarshalledValueMaps = config.isUseLazyDeserialization();
+ syncProcessor = new WithinThreadExecutor();
+ if (config.getListenerAsyncPoolSize() > 0)
+ {
+ asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(),
new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "AsyncNotifier-" +
asyncNotifierThreadNumber.getAndIncrement());
+ }
+ });
+ }
+ else
+ {
+ // use the same sync executor
+ asyncProcessor = syncProcessor;
+ }
}
/**
@@ -147,7 +179,7 @@
@SuppressWarnings("unchecked")
private void validateAndAddListenerInvocation(Object listener)
{
- testListenerClassValidity(listener.getClass());
+ boolean sync = testListenerClassValidity(listener.getClass());
boolean foundMethods = false;
// now try all methods on the listener for anything that we like. Note that only
PUBLIC methods are scanned.
@@ -159,7 +191,7 @@
if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
{
testListenerMethodValidity(m, parameterTypes[i],
allowedMethodAnnotations[i].getName());
- addListenerInvocation(allowedMethodAnnotations[i], new
ListenerInvocation(listener, m));
+ addListenerInvocation(allowedMethodAnnotations[i], new
ListenerInvocation(listener, m, sync));
foundMethods = true;
}
}
@@ -169,12 +201,22 @@
log.warn("Attempted to register listener of class " +
listener.getClass() + ", but no valid, public methods annotated with method-level
event annotations found! Ignoring listener.");
}
- private static void testListenerClassValidity(Class<?> listenerClass)
+ /**
+ * Tests if a class is properly annotated as a CacheListener and returns whether
callbacks on this class should be invoked
+ * synchronously or asynchronously.
+ *
+ * @param listenerClass class to inspect
+ * @return true if callbacks on this class should use the syncProcessor; false if it
should use the asyncProcessor.
+ */
+ private static boolean testListenerClassValidity(Class<?> listenerClass)
{
- if (!listenerClass.isAnnotationPresent(CacheListener.class))
+ CacheListener cl = listenerClass.getAnnotation(CacheListener.class);
+ if (cl == null)
throw new IncorrectCacheListenerException("Cache listener class MUST be
annotated with org.jboss.cache.notifications.annotation.CacheListener");
if (!Modifier.isPublic(listenerClass.getModifiers()))
throw new IncorrectCacheListenerException("Cache listener class MUST be
public!");
+ return cl.sync();
+
}
private static void testListenerMethodValidity(Method m, Class allowedParameter,
String annotationName)
@@ -616,36 +658,49 @@
*/
class ListenerInvocation
{
-
private final Object target;
-
private final Method method;
+ private final boolean sync;
- public ListenerInvocation(Object target, Method method)
+ public ListenerInvocation(Object target, Method method, boolean sync)
{
this.target = target;
this.method = method;
+ this.sync = sync;
}
- public void invoke(Event e)
+ public void invoke(final Event e)
{
- try
+ Runnable r = new Runnable()
{
- method.invoke(target, e);
- }
- catch (InvocationTargetException e1)
- {
- Throwable cause = e1.getCause();
- if (cause != null)
- throw new CacheException("Caught exception invoking method " +
method + " on listener instance " + target, cause);
- else
- throw new CacheException("Caught exception invoking method " +
method + " on listener instance " + target, e1);
- }
- catch (IllegalAccessException e1)
- {
- log.warn("Unable to invoke method " + method + " on Object
instance " + target + " - removing this target object from list of
listeners!", e1);
- removeCacheListener(this.target);
- }
+
+ public void run()
+ {
+ try
+ {
+ method.invoke(target, e);
+ }
+ catch (InvocationTargetException exception)
+ {
+ Throwable cause = exception.getCause();
+ if (cause != null)
+ throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, cause);
+ else
+ throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, exception);
+ }
+ catch (IllegalAccessException exception)
+ {
+ log.warn("Unable to invoke method " + method + " on
Object instance " + target + " - removing this target object from list of
listeners!", exception);
+ removeCacheListener(target);
+ }
+ }
+ };
+
+ if (sync)
+ syncProcessor.execute(r);
+ else
+ asyncProcessor.execute(r);
+
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04
12:49:34 UTC (rev 6492)
+++
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04
13:27:06 UTC (rev 6493)
@@ -298,4 +298,5 @@
@Target(ElementType.TYPE)
public @interface CacheListener
{
+ boolean sync() default true;
}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java 2008-08-04
13:27:06 UTC (rev 6493)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An executor that works within the current thread.
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @see <a
href="http://jcip.net/">Java Concurrency In
Practice</a>
+ * @since 3.0
+ */
+public class WithinThreadExecutor implements ExecutorService
+{
+ boolean shutDown = false;
+
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+
+ public void shutdown()
+ {
+ shutDown = true;
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ shutDown = true;
+ return Collections.emptyList();
+ }
+
+ public boolean isShutdown()
+ {
+ return shutDown;
+ }
+
+ public boolean isTerminated()
+ {
+ return shutDown;
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ return shutDown;
+ }
+
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Future<?> submit(Runnable task)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>>
invokeAll(Collection<Callable<T>> tasks) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>>
invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) throws
InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks) throws
InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java 2008-08-04
13:27:06 UTC (rev 6493)
@@ -0,0 +1,87 @@
+package org.jboss.cache.notifications;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.NodeCreatedEvent;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+@Test(groups = "functional")
+public class AsyncNotificationTest
+{
+ public void testAsyncNotification() throws InterruptedException
+ {
+ Cache<String, String> c = null;
+ try
+ {
+ c = new DefaultCacheFactory<String, String>().createCache();
+ CountDownLatch latch = new CountDownLatch(2);
+ AbstractListener syncListener = new Listener(latch);
+ AbstractListener asyncListener = new AsyncListener(latch);
+ c.addCacheListener(syncListener);
+ c.addCacheListener(asyncListener);
+ c.put("/a", "k", "v");
+ latch.await();
+ assert syncListener.caller == Thread.currentThread();
+ assert asyncListener.caller != Thread.currentThread();
+ }
+ finally
+ {
+ TestingUtil.killCaches(c);
+ }
+ }
+
+ public abstract static class AbstractListener
+ {
+ Thread caller;
+ CountDownLatch latch;
+
+ protected AbstractListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+ }
+
+ @CacheListener(sync = true)
+ public static class Listener extends AbstractListener
+ {
+ public Listener(CountDownLatch latch)
+ {
+ super(latch);
+ }
+
+ @NodeCreated
+ public void handle(NodeCreatedEvent e)
+ {
+ if (e.isPre())
+ {
+ caller = Thread.currentThread();
+ latch.countDown();
+ }
+ }
+ }
+
+ @CacheListener(sync = false)
+ public static class AsyncListener extends AbstractListener
+ {
+ public AsyncListener(CountDownLatch latch)
+ {
+ super(latch);
+ }
+
+ @NodeCreated
+ public void handle(NodeCreatedEvent e)
+ {
+ if (e.isPre())
+ {
+ caller = Thread.currentThread();
+ latch.countDown();
+ }
+ }
+ }
+
+}