[jbosscache-commits] JBoss Cache SVN: r6493 - in core/trunk/src: main/java/org/jboss/cache/notifications and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Aug 4 09:27:06 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-08-04 09:27:06 -0400 (Mon, 04 Aug 2008)
New Revision: 6493

Added:
   core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
   core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
   core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
Log:
JBCACHE-1108 Move CacheListeners to their own thread pool, optionally enabled using an attribute on @CacheListener

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2008-08-04 13:27:06 UTC (rev 6493)
@@ -204,7 +204,7 @@
    private List<CustomInterceptorConfig> customInterceptors = Collections.emptyList();
    private boolean writeSkewCheck = false;
    private int concurrencyLevel = 500;
-   private int listenerAsyncPoolSize = 0;
+   private int listenerAsyncPoolSize = 1;
 
    @Start(priority = 1)
    private void correctIsolationLevels()

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java	2008-08-04 13:27:06 UTC (rev 6493)
@@ -25,6 +25,7 @@
 import org.jboss.cache.notifications.event.*;
 import static org.jboss.cache.notifications.event.Event.Type.*;
 import org.jboss.cache.util.ImmutableMapCopy;
+import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jgroups.View;
 
 import javax.transaction.Transaction;
@@ -39,6 +40,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Helper class that handles all notifications to registered listeners.
@@ -90,6 +95,10 @@
    private Cache cache;
    private boolean useMarshalledValueMaps;
    private Configuration config;
+   // two separate executor services, one for sync and one for async listeners
+   private ExecutorService syncProcessor;
+   private ExecutorService asyncProcessor;
+   private static final AtomicInteger asyncNotifierThreadNumber = new AtomicInteger(0);
 
    public NotifierImpl()
    {
@@ -126,6 +135,13 @@
       this.config = config;
    }
 
+   @Stop
+   private void stop()
+   {
+      syncProcessor.shutdownNow();
+      asyncProcessor.shutdownNow();
+   }
+
    @Destroy
    protected void destroy()
    {
@@ -136,6 +152,22 @@
    protected void start()
    {
       useMarshalledValueMaps = config.isUseLazyDeserialization();
+      syncProcessor = new WithinThreadExecutor();
+      if (config.getListenerAsyncPoolSize() > 0)
+      {
+         asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+         {
+            public Thread newThread(Runnable r)
+            {
+               return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
+            }
+         });
+      }
+      else
+      {
+         // use the same sync executor
+         asyncProcessor = syncProcessor;
+      }
    }
 
    /**
@@ -147,7 +179,7 @@
    @SuppressWarnings("unchecked")
    private void validateAndAddListenerInvocation(Object listener)
    {
-      testListenerClassValidity(listener.getClass());
+      boolean sync = testListenerClassValidity(listener.getClass());
 
       boolean foundMethods = false;
       // now try all methods on the listener for anything that we like.  Note that only PUBLIC methods are scanned.
@@ -159,7 +191,7 @@
             if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
             {
                testListenerMethodValidity(m, parameterTypes[i], allowedMethodAnnotations[i].getName());
-               addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m));
+               addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m, sync));
                foundMethods = true;
             }
          }
@@ -169,12 +201,22 @@
          log.warn("Attempted to register listener of class " + listener.getClass() + ", but no valid, public methods annotated with method-level event annotations found! Ignoring listener.");
    }
 
-   private static void testListenerClassValidity(Class<?> listenerClass)
+   /**
+    * Tests if a class is properly annotated as a CacheListener and returns whether callbacks on this class should be invoked
+    * synchronously or asynchronously.
+    *
+    * @param listenerClass class to inspect
+    * @return true if callbacks on this class should use the syncProcessor; false if it should use the asyncProcessor.
+    */
+   private static boolean testListenerClassValidity(Class<?> listenerClass)
    {
-      if (!listenerClass.isAnnotationPresent(CacheListener.class))
+      CacheListener cl = listenerClass.getAnnotation(CacheListener.class);
+      if (cl == null)
          throw new IncorrectCacheListenerException("Cache listener class MUST be annotated with org.jboss.cache.notifications.annotation.CacheListener");
       if (!Modifier.isPublic(listenerClass.getModifiers()))
          throw new IncorrectCacheListenerException("Cache listener class MUST be public!");
+      return cl.sync();
+
    }
 
    private static void testListenerMethodValidity(Method m, Class allowedParameter, String annotationName)
@@ -616,36 +658,49 @@
     */
    class ListenerInvocation
    {
-
       private final Object target;
-
       private final Method method;
+      private final boolean sync;
 
-      public ListenerInvocation(Object target, Method method)
+      public ListenerInvocation(Object target, Method method, boolean sync)
       {
          this.target = target;
          this.method = method;
+         this.sync = sync;
       }
 
-      public void invoke(Event e)
+      public void invoke(final Event e)
       {
-         try
+         Runnable r = new Runnable()
          {
-            method.invoke(target, e);
-         }
-         catch (InvocationTargetException e1)
-         {
-            Throwable cause = e1.getCause();
-            if (cause != null)
-               throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
-            else
-               throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, e1);
-         }
-         catch (IllegalAccessException e1)
-         {
-            log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", e1);
-            removeCacheListener(this.target);
-         }
+
+            public void run()
+            {
+               try
+               {
+                  method.invoke(target, e);
+               }
+               catch (InvocationTargetException exception)
+               {
+                  Throwable cause = exception.getCause();
+                  if (cause != null)
+                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
+                  else
+                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, exception);
+               }
+               catch (IllegalAccessException exception)
+               {
+                  log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", exception);
+                  removeCacheListener(target);
+               }
+            }
+         };
+
+         if (sync)
+            syncProcessor.execute(r);
+         else
+            asyncProcessor.execute(r);
+
       }
 
    }

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java	2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java	2008-08-04 13:27:06 UTC (rev 6493)
@@ -298,4 +298,5 @@
 @Target(ElementType.TYPE)
 public @interface CacheListener
 {
+   boolean sync() default true;
 }

Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java	2008-08-04 13:27:06 UTC (rev 6493)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An executor that works within the current thread.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @see <a href="http://jcip.net/">Java Concurrency In Practice</a>
+ * @since 3.0
+ */
+public class WithinThreadExecutor implements ExecutorService
+{
+   boolean shutDown = false;
+
+   public void execute(Runnable command)
+   {
+      command.run();
+   }
+
+   public void shutdown()
+   {
+      shutDown = true;
+   }
+
+   public List<Runnable> shutdownNow()
+   {
+      shutDown = true;
+      return Collections.emptyList();
+   }
+
+   public boolean isShutdown()
+   {
+      return shutDown;
+   }
+
+   public boolean isTerminated()
+   {
+      return shutDown;
+   }
+
+   public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+   {
+      return shutDown;
+   }
+
+   public <T> Future<T> submit(Callable<T> task)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public <T> Future<T> submit(Runnable task, T result)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public Future<?> submit(Runnable task)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public <T> T invokeAny(Collection<Callable<T>> tasks) throws InterruptedException, ExecutionException
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   public <T> T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+   {
+      throw new UnsupportedOperationException();
+   }
+}

Added: core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java	2008-08-04 13:27:06 UTC (rev 6493)
@@ -0,0 +1,87 @@
+package org.jboss.cache.notifications;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.NodeCreatedEvent;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+ at Test(groups = "functional")
+public class AsyncNotificationTest
+{
+   public void testAsyncNotification() throws InterruptedException
+   {
+      Cache<String, String> c = null;
+      try
+      {
+         c = new DefaultCacheFactory<String, String>().createCache();
+         CountDownLatch latch = new CountDownLatch(2);
+         AbstractListener syncListener = new Listener(latch);
+         AbstractListener asyncListener = new AsyncListener(latch);
+         c.addCacheListener(syncListener);
+         c.addCacheListener(asyncListener);
+         c.put("/a", "k", "v");
+         latch.await();
+         assert syncListener.caller == Thread.currentThread();
+         assert asyncListener.caller != Thread.currentThread();
+      }
+      finally
+      {
+         TestingUtil.killCaches(c);
+      }
+   }
+
+   public abstract static class AbstractListener
+   {
+      Thread caller;
+      CountDownLatch latch;
+
+      protected AbstractListener(CountDownLatch latch)
+      {
+         this.latch = latch;
+      }
+   }
+
+   @CacheListener(sync = true)
+   public static class Listener extends AbstractListener
+   {
+      public Listener(CountDownLatch latch)
+      {
+         super(latch);
+      }
+
+      @NodeCreated
+      public void handle(NodeCreatedEvent e)
+      {
+         if (e.isPre())
+         {
+            caller = Thread.currentThread();
+            latch.countDown();
+         }
+      }
+   }
+
+   @CacheListener(sync = false)
+   public static class AsyncListener extends AbstractListener
+   {
+      public AsyncListener(CountDownLatch latch)
+      {
+         super(latch);
+      }
+
+      @NodeCreated
+      public void handle(NodeCreatedEvent e)
+      {
+         if (e.isPre())
+         {
+            caller = Thread.currentThread();
+            latch.countDown();
+         }
+      }
+   }
+
+}




More information about the jbosscache-commits mailing list