[jbosscache-commits] JBoss Cache SVN: r7624 - in core/branches/flat/src: main/java/org/horizon/notifications and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Feb 2 09:31:04 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-02 09:31:03 -0500 (Mon, 02 Feb 2009)
New Revision: 7624

Added:
   core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java
Modified:
   core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
   core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java
   core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java
   core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java
   core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
   core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java
   core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java
Log:
Finished notification work

Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -108,15 +108,19 @@
       if (globalComponents.getState() != ComponentStatus.STARTED || globalComponents.getState() != ComponentStatus.STARTING) {
          globalComponents.start();
       }
+      boolean needToNotify = state != ComponentStatus.STARTED && state != ComponentStatus.STARTING;
       super.start();
-      globalComponents.registerNamedComponentRegistry(this, cacheName);
-      cacheManagerNotifier.notifyCacheStarted(cacheName);
+      if (needToNotify && state == ComponentStatus.STARTED) {
+         globalComponents.registerNamedComponentRegistry(this, cacheName);
+         cacheManagerNotifier.notifyCacheStarted(cacheName);
+      }
    }
 
    @Override
    public void stop() {
       if (state.stopAllowed()) globalComponents.unregisterNamedComponentRegistry(cacheName);
+      boolean needToNotify = state == ComponentStatus.STARTED || state == ComponentStatus.STARTING;
       super.stop();
-      cacheManagerNotifier.notifyCacheStopped(cacheName);
+      if (state == ComponentStatus.STOPPED && needToNotify) cacheManagerNotifier.notifyCacheStopped(cacheName);
    }
 }

Added: core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -0,0 +1,201 @@
+package org.horizon.notifications;
+
+import org.horizon.CacheException;
+import org.horizon.factories.KnownComponentNames;
+import org.horizon.factories.annotations.ComponentName;
+import org.horizon.factories.annotations.Destroy;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Start;
+import org.horizon.factories.annotations.Stop;
+import org.horizon.logging.Log;
+import org.horizon.util.ReflectionUtil;
+import org.horizon.util.concurrent.WithinThreadExecutor;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Functionality common to both {@link org.horizon.notifications.cachemanagerlistener.CacheManagerNotifierImpl} and
+ * {@link org.horizon.notifications.cachelistener.CacheNotifierImpl}
+ *
+ * @author Manik Surtani
+ */
+public abstract class AbstractListenerImpl {
+
+   protected final Map<Class<? extends Annotation>, List<ListenerInvocation>> listenersMap = new HashMap<Class<? extends Annotation>, List<ListenerInvocation>>(16, 0.99f);
+
+
+   // two separate executor services, one for sync and one for async listeners
+   protected ExecutorService syncProcessor;
+   protected ExecutorService asyncProcessor;
+
+
+   @Inject
+   void injectExecutor(@ComponentName(KnownComponentNames.ASYNC_NOTIFICATION_EXECUTOR) ExecutorService executor) {
+      this.asyncProcessor = executor;
+   }
+
+   @Destroy
+   void destroy() {
+      removeAllCacheListeners();
+   }
+
+   @Start
+   public void start() {
+      syncProcessor = new WithinThreadExecutor();
+   }
+
+   /**
+    * Removes all listeners from the notifier
+    */
+   @Stop(priority = 99)
+   public void removeAllCacheListeners() {
+      for (List<ListenerInvocation> list : listenersMap.values()) {
+         if (list != null) list.clear();
+      }
+   }
+
+   protected abstract Log getLog();
+
+   protected abstract Map<Class<? extends Annotation>, Class> getAllowedMethodAnnotations();
+
+   protected List<ListenerInvocation> getListenerCollectionForAnnotation(Class<? extends Annotation> annotation) {
+      List<ListenerInvocation> list = listenersMap.get(annotation);
+      if (list == null) throw new CacheException("Unknown listener annotation: " + annotation);
+      return list;
+   }
+
+   public void removeListener(Object listener) {
+      for (Class<? extends Annotation> annotation : getAllowedMethodAnnotations().keySet())
+         removeListenerInvocation(annotation, listener);
+   }
+
+   private void removeListenerInvocation(Class<? extends Annotation> annotation, Object listener) {
+      if (listener == null) return;
+      List<ListenerInvocation> l = getListenerCollectionForAnnotation(annotation);
+      Set<Object> markedForRemoval = new HashSet<Object>();
+      for (ListenerInvocation li : l) {
+         if (listener.equals(li.target)) markedForRemoval.add(li);
+      }
+      l.removeAll(markedForRemoval);
+   }
+
+   public void addListener(Object listener) {
+      validateAndAddListenerInvocation(listener);
+   }
+
+   public Set<Object> getListeners() {
+      Set<Object> result = new HashSet<Object>();
+      for (List<ListenerInvocation> list : listenersMap.values()) {
+         for (ListenerInvocation li : list) result.add(li.target);
+      }
+      return Collections.unmodifiableSet(result);
+   }
+
+   /**
+    * Loops through all valid methods on the object passed in, and caches the relevant methods as {@link
+    * ListenerInvocation} for invocation by reflection.
+    *
+    * @param listener object to be considered as a listener.
+    */
+   @SuppressWarnings("unchecked")
+   private void validateAndAddListenerInvocation(Object listener) {
+      boolean sync = testListenerClassValidity(listener.getClass());
+      boolean foundMethods = false;
+      Map<Class<? extends Annotation>, Class> allowedListeners = getAllowedMethodAnnotations();
+      // now try all methods on the listener for anything that we like.  Note that only PUBLIC methods are scanned.
+      for (Method m : listener.getClass().getMethods()) {
+         // loop through all valid method annotations
+         for (Class<? extends Annotation> annotation : allowedListeners.keySet()) {
+            if (m.isAnnotationPresent(annotation)) {
+               testListenerMethodValidity(m, allowedListeners.get(annotation), annotation.getName());
+               addListenerInvocation(annotation, new ListenerInvocation(listener, m, sync));
+               foundMethods = true;
+            }
+         }
+      }
+
+      if (!foundMethods && getLog().isWarnEnabled())
+         getLog().warn("Attempted to register listener of class " + listener.getClass() + ", but no valid, public methods annotated with method-level event annotations found! Ignoring listener.");
+   }
+
+   private void addListenerInvocation(Class annotation, ListenerInvocation li) {
+      List<ListenerInvocation> result = getListenerCollectionForAnnotation(annotation);
+      result.add(li);
+   }
+
+   /**
+    * Tests if a class is properly annotated as a CacheListener and returns whether callbacks on this class should be
+    * invoked synchronously or asynchronously.
+    *
+    * @param listenerClass class to inspect
+    * @return true if callbacks on this class should use the syncProcessor; false if it should use the asyncProcessor.
+    */
+   protected boolean testListenerClassValidity(Class<?> listenerClass) {
+      Listener l = ReflectionUtil.getAnnotation(listenerClass, Listener.class);
+      if (l == null)
+         throw new IncorrectListenerException("Cache listener class MUST be annotated with org.horizon.notifications.annotation.Listener");
+      if (!Modifier.isPublic(listenerClass.getModifiers()))
+         throw new IncorrectListenerException("Cache listener class MUST be public!");
+      return l.sync();
+   }
+
+   protected void testListenerMethodValidity(Method m, Class allowedParameter, String annotationName) {
+      if (m.getParameterTypes().length != 1 || !m.getParameterTypes()[0].isAssignableFrom(allowedParameter))
+         throw new IncorrectListenerException("Methods annotated with " + annotationName + " must accept exactly one parameter, of assignable from type " + allowedParameter.getName());
+      if (!m.getReturnType().equals(void.class))
+         throw new IncorrectListenerException("Methods annotated with " + annotationName + " should have a return type of void.");
+   }
+
+   /**
+    * Class that encapsulates a valid invocation for a given registered listener - containing a reference to the method
+    * to be invoked as well as the target object.
+    */
+   protected class ListenerInvocation {
+      public final Object target;
+      public final Method method;
+      public final boolean sync;
+
+      public ListenerInvocation(Object target, Method method, boolean sync) {
+         this.target = target;
+         this.method = method;
+         this.sync = sync;
+      }
+
+      public void invoke(final Object event) {
+         Runnable r = new Runnable() {
+
+            public void run() {
+               try {
+                  method.invoke(target, event);
+               }
+               catch (InvocationTargetException exception) {
+                  Throwable cause = exception.getCause();
+                  if (cause != null)
+                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
+                  else
+                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, exception);
+               }
+               catch (IllegalAccessException exception) {
+                  getLog().warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", exception);
+                  removeListener(target);
+               }
+            }
+         };
+
+         if (sync)
+            syncProcessor.execute(r);
+         else
+            asyncProcessor.execute(r);
+      }
+   }
+}

Modified: core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -22,6 +22,9 @@
 package org.horizon.notifications.cachelistener;
 
 import org.horizon.context.InvocationContext;
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
 import org.horizon.notifications.Listenable;
 
 import javax.transaction.Transaction;
@@ -32,6 +35,8 @@
  * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
+ at NonVolatile
+ at Scope(Scopes.NAMED_CACHE)
 public interface CacheNotifier extends Listenable {
    /**
     * Notifies all registered listeners of a CacheEntryCreated event.

Modified: core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -22,41 +22,22 @@
 package org.horizon.notifications.cachelistener;
 
 import org.horizon.Cache;
-import org.horizon.CacheException;
 import org.horizon.CacheSPI;
 import org.horizon.context.InvocationContext;
-import org.horizon.factories.KnownComponentNames;
-import org.horizon.factories.annotations.ComponentName;
-import org.horizon.factories.annotations.Destroy;
 import org.horizon.factories.annotations.Inject;
-import org.horizon.factories.annotations.NonVolatile;
-import org.horizon.factories.annotations.Start;
-import org.horizon.factories.annotations.Stop;
-import org.horizon.factories.scopes.Scope;
-import org.horizon.factories.scopes.Scopes;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
-import org.horizon.notifications.IncorrectListenerException;
-import org.horizon.notifications.Listener;
+import org.horizon.notifications.AbstractListenerImpl;
 import org.horizon.notifications.cachelistener.annotation.*;
 import org.horizon.notifications.cachelistener.event.*;
 import static org.horizon.notifications.cachelistener.event.Event.Type.*;
-import org.horizon.util.ReflectionUtil;
-import org.horizon.util.concurrent.WithinThreadExecutor;
 
 import javax.transaction.Transaction;
 import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Helper class that handles all notifications to registered listeners.
@@ -64,25 +45,26 @@
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  * @since 1.0
  */
- at NonVolatile
- at Scope(Scopes.NAMED_CACHE)
-public class CacheNotifierImpl implements CacheNotifier {
+public class CacheNotifierImpl extends AbstractListenerImpl implements CacheNotifier {
    private static final Log log = LogFactory.getLog(CacheNotifierImpl.class);
 
-   private static final Class[] allowedMethodAnnotations =
-         {
-               CacheEntryCreated.class, CacheEntryRemoved.class, CacheEntryVisited.class, CacheEntryModified.class,
-               CacheEntryActivated.class, CacheEntryPassivated.class, CacheEntryLoaded.class, CacheEntryEvicted.class, TransactionRegistered.class, TransactionCompleted.class,
-               CacheEntryInvalidated.class
-         };
-   private static final Class[] parameterTypes =
-         {
-               CacheEntryCreatedEvent.class, CacheEntryRemovedEvent.class, CacheEntryVisitedEvent.class, CacheEntryModifiedEvent.class,
-               CacheEntryActivatedEvent.class, CacheEntryPassivatedEvent.class, CacheEntryLoadedEvent.class, CacheEntryEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class,
-               CacheEntryInvalidatedEvent.class
-         };
+   private static final Map<Class<? extends Annotation>, Class> allowedListeners = new HashMap<Class<? extends Annotation>, Class>();
 
-   final Map<Class<? extends Annotation>, List<ListenerInvocation>> listenersMap = new HashMap<Class<? extends Annotation>, List<ListenerInvocation>>(16, 0.99f);
+   static {
+      allowedListeners.put(CacheEntryCreated.class, CacheEntryCreatedEvent.class);
+      allowedListeners.put(CacheEntryRemoved.class, CacheEntryRemovedEvent.class);
+      allowedListeners.put(CacheEntryVisited.class, CacheEntryVisitedEvent.class);
+      allowedListeners.put(CacheEntryModified.class, CacheEntryModifiedEvent.class);
+      allowedListeners.put(CacheEntryActivated.class, CacheEntryActivatedEvent.class);
+      allowedListeners.put(CacheEntryPassivated.class, CacheEntryPassivatedEvent.class);
+      allowedListeners.put(CacheEntryLoaded.class, CacheEntryLoadedEvent.class);
+      allowedListeners.put(CacheEntryEvicted.class, CacheEntryEvictedEvent.class);
+      allowedListeners.put(TransactionRegistered.class, TransactionRegisteredEvent.class);
+      allowedListeners.put(TransactionCompleted.class, TransactionCompletedEvent.class);
+      allowedListeners.put(CacheEntryInvalidated.class, CacheEntryInvalidatedEvent.class);
+
+   }
+
    final List<ListenerInvocation> cacheEntryCreatedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
    final List<ListenerInvocation> cacheEntryRemovedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
    final List<ListenerInvocation> cacheEntryVisitedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
@@ -96,11 +78,9 @@
    final List<ListenerInvocation> transactionCompletedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
 
    private Cache cache;
-   // two separate executor services, one for sync and one for async listeners
-   private ExecutorService syncProcessor;
-   private ExecutorService asyncProcessor;
 
    public CacheNotifierImpl() {
+
       listenersMap.put(CacheEntryCreated.class, cacheEntryCreatedListeners);
       listenersMap.put(CacheEntryRemoved.class, cacheEntryRemovedListeners);
       listenersMap.put(CacheEntryVisited.class, cacheEntryVisitedListeners);
@@ -115,126 +95,18 @@
    }
 
    @Inject
-   void injectDependencies(CacheSPI cache,
-                           @ComponentName(KnownComponentNames.ASYNC_NOTIFICATION_EXECUTOR) ExecutorService executor) {
+   void injectDependencies(CacheSPI cache) {
       this.cache = cache;
-      this.asyncProcessor = executor;
    }
 
-   @Stop
-   void stop() {
-      syncProcessor.shutdownNow();
-      asyncProcessor.shutdownNow();
+   protected Log getLog() {
+      return log;
    }
 
-   @Destroy
-   void destroy() {
-      removeAllCacheListeners();
+   protected Map<Class<? extends Annotation>, Class> getAllowedMethodAnnotations() {
+      return allowedListeners;
    }
 
-   @Start
-   void start() {
-      syncProcessor = new WithinThreadExecutor();
-   }
-
-   /**
-    * Loops through all valid methods on the object passed in, and caches the relevant methods as {@link
-    * CacheNotifierImpl.ListenerInvocation} for invocation by reflection.
-    *
-    * @param listener object to be considered as a listener.
-    */
-   @SuppressWarnings("unchecked")
-   private void validateAndAddListenerInvocation(Object listener) {
-      boolean sync = testListenerClassValidity(listener.getClass());
-
-      boolean foundMethods = false;
-      // now try all methods on the listener for anything that we like.  Note that only PUBLIC methods are scanned.
-      for (Method m : listener.getClass().getMethods()) {
-         // loop through all valid method annotations
-         for (int i = 0; i < allowedMethodAnnotations.length; i++) {
-            if (m.isAnnotationPresent(allowedMethodAnnotations[i])) {
-               testListenerMethodValidity(m, parameterTypes[i], allowedMethodAnnotations[i].getName());
-               addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m, sync));
-               foundMethods = true;
-            }
-         }
-      }
-
-      if (!foundMethods && log.isWarnEnabled())
-         log.warn("Attempted to register listener of class " + listener.getClass() + ", but no valid, public methods annotated with method-level event annotations found! Ignoring listener.");
-   }
-
-   /**
-    * Tests if a class is properly annotated as a CacheListener and returns whether callbacks on this class should be
-    * invoked synchronously or asynchronously.
-    *
-    * @param listenerClass class to inspect
-    * @return true if callbacks on this class should use the syncProcessor; false if it should use the asyncProcessor.
-    */
-   private static boolean testListenerClassValidity(Class<?> listenerClass) {
-      Listener l = ReflectionUtil.getAnnotation(listenerClass, Listener.class);
-      if (l == null)
-         throw new IncorrectListenerException("Cache listener class MUST be annotated with org.horizon.notifications.annotation.Listener");
-      if (!Modifier.isPublic(listenerClass.getModifiers()))
-         throw new IncorrectListenerException("Cache listener class MUST be public!");
-      return l.sync();
-   }
-
-   private static void testListenerMethodValidity(Method m, Class allowedParameter, String annotationName) {
-      if (m.getParameterTypes().length != 1 || !m.getParameterTypes()[0].isAssignableFrom(allowedParameter))
-         throw new IncorrectListenerException("Methods annotated with " + annotationName + " must accept exactly one parameter, of assignable from type " + allowedParameter.getName());
-      if (!m.getReturnType().equals(void.class))
-         throw new IncorrectListenerException("Methods annotated with " + annotationName + " should have a return type of void.");
-   }
-
-   private void addListenerInvocation(Class annotation, ListenerInvocation li) {
-      List<ListenerInvocation> result = getListenerCollectionForAnnotation(annotation);
-      result.add(li);
-   }
-
-   public void addListener(Object listener) {
-      validateAndAddListenerInvocation(listener);
-   }
-
-   public void removeListener(Object listener) {
-      for (Class annotation : allowedMethodAnnotations) removeListenerInvocation(annotation, listener);
-   }
-
-   private void removeListenerInvocation(Class annotation, Object listener) {
-      if (listener == null) return;
-      List<ListenerInvocation> l = getListenerCollectionForAnnotation(annotation);
-      Set<Object> markedForRemoval = new HashSet<Object>();
-      for (ListenerInvocation li : l) {
-         if (listener.equals(li.target)) markedForRemoval.add(li);
-      }
-      l.removeAll(markedForRemoval);
-   }
-
-   /**
-    * Removes all listeners from the notifier, including the evictionPolicyListener.
-    */
-   @Stop(priority = 99)
-   public void removeAllCacheListeners() {
-      cacheEntryCreatedListeners.clear();
-      cacheEntryRemovedListeners.clear();
-      cacheEntryVisitedListeners.clear();
-      cacheEntryModifiedListeners.clear();
-      cacheEntryActivatedListeners.clear();
-      cacheEntryPassivatedListeners.clear();
-      cacheEntryLoadedListeners.clear();
-      cacheEntryEvictedListeners.clear();
-      transactionRegisteredListeners.clear();
-      transactionCompletedListeners.clear();
-   }
-
-   public Set<Object> getListeners() {
-      Set<Object> result = new HashSet<Object>();
-      for (List<ListenerInvocation> list : listenersMap.values()) {
-         for (ListenerInvocation li : list) result.add(li.target);
-      }
-      return Collections.unmodifiableSet(result);
-   }
-
    public void notifyCacheEntryCreated(Object key, boolean pre, InvocationContext ctx) {
       if (!cacheEntryCreatedListeners.isEmpty()) {
          boolean originLocal = ctx.isOriginLocal();
@@ -437,53 +309,4 @@
       newContext.putLookedUpEntries(ctx.getLookedUpEntries());
       return ctx;
    }
-
-   /**
-    * Class that encapsulates a valid invocation for a given registered listener - containing a reference to the method
-    * to be invoked as well as the target object.
-    */
-   class ListenerInvocation {
-      private final Object target;
-      private final Method method;
-      private final boolean sync;
-
-      public ListenerInvocation(Object target, Method method, boolean sync) {
-         this.target = target;
-         this.method = method;
-         this.sync = sync;
-      }
-
-      public void invoke(final Event e) {
-         Runnable r = new Runnable() {
-
-            public void run() {
-               try {
-                  method.invoke(target, e);
-               }
-               catch (InvocationTargetException exception) {
-                  Throwable cause = exception.getCause();
-                  if (cause != null)
-                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
-                  else
-                     throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, exception);
-               }
-               catch (IllegalAccessException exception) {
-                  log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", exception);
-                  removeListener(target);
-               }
-            }
-         };
-
-         if (sync)
-            syncProcessor.execute(r);
-         else
-            asyncProcessor.execute(r);
-      }
-   }
-
-   private List<ListenerInvocation> getListenerCollectionForAnnotation(Class<? extends Annotation> annotation) {
-      List<ListenerInvocation> list = listenersMap.get(annotation);
-      if (list == null) throw new CacheException("Unknown listener annotation: " + annotation);
-      return list;
-   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -1,5 +1,8 @@
 package org.horizon.notifications.cachemanagerlistener;
 
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
 import org.horizon.notifications.Listenable;
 import org.horizon.remoting.transport.Address;
 
@@ -11,6 +14,8 @@
  * @author Manik Surtani
  * @since 1.0
  */
+ at NonVolatile
+ at Scope(Scopes.GLOBAL)
 public interface CacheManagerNotifier extends Listenable {
    /**
     * Notifies all registered listeners of a viewChange event.  Note that viewChange notifications are ALWAYS sent

Modified: core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -1,56 +1,102 @@
 package org.horizon.notifications.cachemanagerlistener;
 
-import org.horizon.factories.annotations.NonVolatile;
-import org.horizon.factories.annotations.Start;
 import org.horizon.factories.annotations.Stop;
-import org.horizon.factories.scopes.Scope;
-import org.horizon.factories.scopes.Scopes;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.AbstractListenerImpl;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.CacheStartedEvent;
+import org.horizon.notifications.cachemanagerlistener.event.CacheStoppedEvent;
+import org.horizon.notifications.cachemanagerlistener.event.Event;
+import org.horizon.notifications.cachemanagerlistener.event.EventImpl;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.horizon.remoting.transport.Address;
 
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
- * Global, shared notifications.  Typically on the CacheManager.  See
+ * Global, shared notifications on the cache manager.
  *
  * @author Manik Surtani
  * @since 1.0
  */
- at NonVolatile
- at Scope(Scopes.GLOBAL)
-public class CacheManagerNotifierImpl implements CacheManagerNotifier {
+public class CacheManagerNotifierImpl extends AbstractListenerImpl implements CacheManagerNotifier {
 
-   public void notifyViewChange(List<Address> members, Address myAddress) {
-      // TODO: Manik: Customise this generated block
-   }
+   private static final Log log = LogFactory.getLog(CacheManagerNotifierImpl.class);
 
-   public void notifyCacheStarted(String cacheName) {
-      // TODO: Manik: Customise this generated block
+   private static final Map<Class<? extends Annotation>, Class> allowedListeners = new HashMap<Class<? extends Annotation>, Class>();
+
+   static {
+      allowedListeners.put(CacheStarted.class, CacheStartedEvent.class);
+      allowedListeners.put(CacheStopped.class, CacheStoppedEvent.class);
+      allowedListeners.put(ViewChanged.class, ViewChangedEvent.class);
    }
 
-   public void notifyCacheStopped(String cacheName) {
-      // TODO: Manik: Customise this generated block
+   final List<ListenerInvocation> cacheStartedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
+   final List<ListenerInvocation> cacheStoppedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
+   final List<ListenerInvocation> viewChangedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
+
+   private CacheManager cacheManager;
+
+   public CacheManagerNotifierImpl() {
+      listenersMap.put(CacheStarted.class, cacheStartedListeners);
+      listenersMap.put(CacheStopped.class, cacheStoppedListeners);
+      listenersMap.put(ViewChanged.class, viewChangedListeners);
    }
 
-   public void addListener(Object listener) {
-      // TODO: Manik: Customise this generated block
+   public void injectCacheManager(CacheManager cacheManager) {
+      this.cacheManager = cacheManager;
    }
 
-   public void removeListener(Object listener) {
-      // TODO: Manik: Customise this generated block
+   public void notifyViewChange(List<Address> members, Address myAddress) {
+      if (!viewChangedListeners.isEmpty()) {
+         EventImpl e = new EventImpl();
+         e.setLocalAddress(myAddress);
+         e.setNewMemberList(members);
+         e.setCacheManager(cacheManager);
+         e.setType(Event.Type.VIEW_CHANGED);
+         for (ListenerInvocation listener : viewChangedListeners) listener.invoke(e);
+      }
    }
 
-   public Set<Object> getListeners() {
-      return null;  // TODO: Manik: Customise this generated block
+   public void notifyCacheStarted(String cacheName) {
+      if (!cacheStartedListeners.isEmpty()) {
+         EventImpl e = new EventImpl();
+         e.setCacheName(cacheName);
+         e.setCacheManager(cacheManager);
+         e.setType(Event.Type.CACHE_STARTED);
+         for (ListenerInvocation listener : cacheStartedListeners) listener.invoke(e);
+      }
    }
 
-   @Start
-   public void start() {
-      // TODO: Manik: Customise this generated block
+   public void notifyCacheStopped(String cacheName) {
+      if (!cacheStoppedListeners.isEmpty()) {
+         EventImpl e = new EventImpl();
+         e.setCacheName(cacheName);
+         e.setCacheManager(cacheManager);
+         e.setType(Event.Type.CACHE_STOPPED);
+         for (ListenerInvocation listener : cacheStoppedListeners) listener.invoke(e);
+      }
    }
 
    @Stop
-   public void stop() {
+   void stop() {
+      if (syncProcessor != null) syncProcessor.shutdownNow();
+      if (asyncProcessor != null) asyncProcessor.shutdownNow();
+   }
 
+   protected Log getLog() {
+      return log;
    }
+
+   protected Map<Class<? extends Annotation>, Class> getAllowedMethodAnnotations() {
+      return allowedListeners;
+   }
 }

Modified: core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -20,7 +20,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
- at Test(groups = {"functional"}, sequential = true, testName = "notifications.ConcurrentNotificationTest")
+ at Test(groups = "functional", sequential = true, testName = "notifications.ConcurrentNotificationTest")
 public class ConcurrentNotificationTest {
    Cache<String, String> cache;
    CacheManager cm;

Modified: core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java	2009-02-02 13:07:49 UTC (rev 7623)
+++ core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java	2009-02-02 14:31:03 UTC (rev 7624)
@@ -29,7 +29,7 @@
       mockCache = createNiceMock(CacheSPI.class);
       EasyMock.expect(mockCache.getInvocationContext()).andReturn(new InvocationContextImpl()).anyTimes();
       EasyMock.replay(mockCache);
-      n.injectDependencies(mockCache, null);
+      n.injectDependencies(mockCache);
       cl = new CacheListener();
       n.start();
       n.addListener(cl);




More information about the jbosscache-commits mailing list