Author: manik.surtani(a)jboss.com
Date: 2009-02-02 09:31:03 -0500 (Mon, 02 Feb 2009)
New Revision: 7624
Added:
core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java
Modified:
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java
core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java
Log:
Finished notification work
Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -108,15 +108,19 @@
if (globalComponents.getState() != ComponentStatus.STARTED ||
globalComponents.getState() != ComponentStatus.STARTING) {
globalComponents.start();
}
+ boolean needToNotify = state != ComponentStatus.STARTED && state !=
ComponentStatus.STARTING;
super.start();
- globalComponents.registerNamedComponentRegistry(this, cacheName);
- cacheManagerNotifier.notifyCacheStarted(cacheName);
+ if (needToNotify && state == ComponentStatus.STARTED) {
+ globalComponents.registerNamedComponentRegistry(this, cacheName);
+ cacheManagerNotifier.notifyCacheStarted(cacheName);
+ }
}
@Override
public void stop() {
if (state.stopAllowed())
globalComponents.unregisterNamedComponentRegistry(cacheName);
+ boolean needToNotify = state == ComponentStatus.STARTED || state ==
ComponentStatus.STARTING;
super.stop();
- cacheManagerNotifier.notifyCacheStopped(cacheName);
+ if (state == ComponentStatus.STOPPED && needToNotify)
cacheManagerNotifier.notifyCacheStopped(cacheName);
}
}
Added:
core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/notifications/AbstractListenerImpl.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -0,0 +1,201 @@
+package org.horizon.notifications;
+
+import org.horizon.CacheException;
+import org.horizon.factories.KnownComponentNames;
+import org.horizon.factories.annotations.ComponentName;
+import org.horizon.factories.annotations.Destroy;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Start;
+import org.horizon.factories.annotations.Stop;
+import org.horizon.logging.Log;
+import org.horizon.util.ReflectionUtil;
+import org.horizon.util.concurrent.WithinThreadExecutor;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Functionality common to both {@link
org.horizon.notifications.cachemanagerlistener.CacheManagerNotifierImpl} and
+ * {@link org.horizon.notifications.cachelistener.CacheNotifierImpl}
+ *
+ * @author Manik Surtani
+ */
+public abstract class AbstractListenerImpl {
+
+ protected final Map<Class<? extends Annotation>,
List<ListenerInvocation>> listenersMap = new HashMap<Class<? extends
Annotation>, List<ListenerInvocation>>(16, 0.99f);
+
+
+ // two separate executor services, one for sync and one for async listeners
+ protected ExecutorService syncProcessor;
+ protected ExecutorService asyncProcessor;
+
+
+ @Inject
+ void injectExecutor((a)ComponentName(KnownComponentNames.ASYNC_NOTIFICATION_EXECUTOR)
ExecutorService executor) {
+ this.asyncProcessor = executor;
+ }
+
+ @Destroy
+ void destroy() {
+ removeAllCacheListeners();
+ }
+
+ @Start
+ public void start() {
+ syncProcessor = new WithinThreadExecutor();
+ }
+
+ /**
+ * Removes all listeners from the notifier
+ */
+ @Stop(priority = 99)
+ public void removeAllCacheListeners() {
+ for (List<ListenerInvocation> list : listenersMap.values()) {
+ if (list != null) list.clear();
+ }
+ }
+
+ protected abstract Log getLog();
+
+ protected abstract Map<Class<? extends Annotation>, Class>
getAllowedMethodAnnotations();
+
+ protected List<ListenerInvocation> getListenerCollectionForAnnotation(Class<?
extends Annotation> annotation) {
+ List<ListenerInvocation> list = listenersMap.get(annotation);
+ if (list == null) throw new CacheException("Unknown listener annotation:
" + annotation);
+ return list;
+ }
+
+ public void removeListener(Object listener) {
+ for (Class<? extends Annotation> annotation :
getAllowedMethodAnnotations().keySet())
+ removeListenerInvocation(annotation, listener);
+ }
+
+ private void removeListenerInvocation(Class<? extends Annotation> annotation,
Object listener) {
+ if (listener == null) return;
+ List<ListenerInvocation> l = getListenerCollectionForAnnotation(annotation);
+ Set<Object> markedForRemoval = new HashSet<Object>();
+ for (ListenerInvocation li : l) {
+ if (listener.equals(li.target)) markedForRemoval.add(li);
+ }
+ l.removeAll(markedForRemoval);
+ }
+
+ public void addListener(Object listener) {
+ validateAndAddListenerInvocation(listener);
+ }
+
+ public Set<Object> getListeners() {
+ Set<Object> result = new HashSet<Object>();
+ for (List<ListenerInvocation> list : listenersMap.values()) {
+ for (ListenerInvocation li : list) result.add(li.target);
+ }
+ return Collections.unmodifiableSet(result);
+ }
+
+ /**
+ * Loops through all valid methods on the object passed in, and caches the relevant
methods as {@link
+ * ListenerInvocation} for invocation by reflection.
+ *
+ * @param listener object to be considered as a listener.
+ */
+ @SuppressWarnings("unchecked")
+ private void validateAndAddListenerInvocation(Object listener) {
+ boolean sync = testListenerClassValidity(listener.getClass());
+ boolean foundMethods = false;
+ Map<Class<? extends Annotation>, Class> allowedListeners =
getAllowedMethodAnnotations();
+ // now try all methods on the listener for anything that we like. Note that only
PUBLIC methods are scanned.
+ for (Method m : listener.getClass().getMethods()) {
+ // loop through all valid method annotations
+ for (Class<? extends Annotation> annotation : allowedListeners.keySet())
{
+ if (m.isAnnotationPresent(annotation)) {
+ testListenerMethodValidity(m, allowedListeners.get(annotation),
annotation.getName());
+ addListenerInvocation(annotation, new ListenerInvocation(listener, m,
sync));
+ foundMethods = true;
+ }
+ }
+ }
+
+ if (!foundMethods && getLog().isWarnEnabled())
+ getLog().warn("Attempted to register listener of class " +
listener.getClass() + ", but no valid, public methods annotated with method-level
event annotations found! Ignoring listener.");
+ }
+
+ private void addListenerInvocation(Class annotation, ListenerInvocation li) {
+ List<ListenerInvocation> result =
getListenerCollectionForAnnotation(annotation);
+ result.add(li);
+ }
+
+ /**
+ * Tests if a class is properly annotated as a CacheListener and returns whether
callbacks on this class should be
+ * invoked synchronously or asynchronously.
+ *
+ * @param listenerClass class to inspect
+ * @return true if callbacks on this class should use the syncProcessor; false if it
should use the asyncProcessor.
+ */
+ protected boolean testListenerClassValidity(Class<?> listenerClass) {
+ Listener l = ReflectionUtil.getAnnotation(listenerClass, Listener.class);
+ if (l == null)
+ throw new IncorrectListenerException("Cache listener class MUST be
annotated with org.horizon.notifications.annotation.Listener");
+ if (!Modifier.isPublic(listenerClass.getModifiers()))
+ throw new IncorrectListenerException("Cache listener class MUST be
public!");
+ return l.sync();
+ }
+
+ protected void testListenerMethodValidity(Method m, Class allowedParameter, String
annotationName) {
+ if (m.getParameterTypes().length != 1 ||
!m.getParameterTypes()[0].isAssignableFrom(allowedParameter))
+ throw new IncorrectListenerException("Methods annotated with " +
annotationName + " must accept exactly one parameter, of assignable from type "
+ allowedParameter.getName());
+ if (!m.getReturnType().equals(void.class))
+ throw new IncorrectListenerException("Methods annotated with " +
annotationName + " should have a return type of void.");
+ }
+
+ /**
+ * Class that encapsulates a valid invocation for a given registered listener -
containing a reference to the method
+ * to be invoked as well as the target object.
+ */
+ protected class ListenerInvocation {
+ public final Object target;
+ public final Method method;
+ public final boolean sync;
+
+ public ListenerInvocation(Object target, Method method, boolean sync) {
+ this.target = target;
+ this.method = method;
+ this.sync = sync;
+ }
+
+ public void invoke(final Object event) {
+ Runnable r = new Runnable() {
+
+ public void run() {
+ try {
+ method.invoke(target, event);
+ }
+ catch (InvocationTargetException exception) {
+ Throwable cause = exception.getCause();
+ if (cause != null)
+ throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, cause);
+ else
+ throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, exception);
+ }
+ catch (IllegalAccessException exception) {
+ getLog().warn("Unable to invoke method " + method + " on
Object instance " + target + " - removing this target object from list of
listeners!", exception);
+ removeListener(target);
+ }
+ }
+ };
+
+ if (sync)
+ syncProcessor.execute(r);
+ else
+ asyncProcessor.execute(r);
+ }
+ }
+}
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifier.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -22,6 +22,9 @@
package org.horizon.notifications.cachelistener;
import org.horizon.context.InvocationContext;
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
import org.horizon.notifications.Listenable;
import javax.transaction.Transaction;
@@ -32,6 +35,8 @@
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
+@NonVolatile
+(a)Scope(Scopes.NAMED_CACHE)
public interface CacheNotifier extends Listenable {
/**
* Notifies all registered listeners of a CacheEntryCreated event.
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/CacheNotifierImpl.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -22,41 +22,22 @@
package org.horizon.notifications.cachelistener;
import org.horizon.Cache;
-import org.horizon.CacheException;
import org.horizon.CacheSPI;
import org.horizon.context.InvocationContext;
-import org.horizon.factories.KnownComponentNames;
-import org.horizon.factories.annotations.ComponentName;
-import org.horizon.factories.annotations.Destroy;
import org.horizon.factories.annotations.Inject;
-import org.horizon.factories.annotations.NonVolatile;
-import org.horizon.factories.annotations.Start;
-import org.horizon.factories.annotations.Stop;
-import org.horizon.factories.scopes.Scope;
-import org.horizon.factories.scopes.Scopes;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
-import org.horizon.notifications.IncorrectListenerException;
-import org.horizon.notifications.Listener;
+import org.horizon.notifications.AbstractListenerImpl;
import org.horizon.notifications.cachelistener.annotation.*;
import org.horizon.notifications.cachelistener.event.*;
import static org.horizon.notifications.cachelistener.event.Event.Type.*;
-import org.horizon.util.ReflectionUtil;
-import org.horizon.util.concurrent.WithinThreadExecutor;
import javax.transaction.Transaction;
import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
/**
* Helper class that handles all notifications to registered listeners.
@@ -64,25 +45,26 @@
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
* @since 1.0
*/
-@NonVolatile
-(a)Scope(Scopes.NAMED_CACHE)
-public class CacheNotifierImpl implements CacheNotifier {
+public class CacheNotifierImpl extends AbstractListenerImpl implements CacheNotifier {
private static final Log log = LogFactory.getLog(CacheNotifierImpl.class);
- private static final Class[] allowedMethodAnnotations =
- {
- CacheEntryCreated.class, CacheEntryRemoved.class, CacheEntryVisited.class,
CacheEntryModified.class,
- CacheEntryActivated.class, CacheEntryPassivated.class,
CacheEntryLoaded.class, CacheEntryEvicted.class, TransactionRegistered.class,
TransactionCompleted.class,
- CacheEntryInvalidated.class
- };
- private static final Class[] parameterTypes =
- {
- CacheEntryCreatedEvent.class, CacheEntryRemovedEvent.class,
CacheEntryVisitedEvent.class, CacheEntryModifiedEvent.class,
- CacheEntryActivatedEvent.class, CacheEntryPassivatedEvent.class,
CacheEntryLoadedEvent.class, CacheEntryEvictedEvent.class,
TransactionRegisteredEvent.class, TransactionCompletedEvent.class,
- CacheEntryInvalidatedEvent.class
- };
+ private static final Map<Class<? extends Annotation>, Class>
allowedListeners = new HashMap<Class<? extends Annotation>, Class>();
- final Map<Class<? extends Annotation>, List<ListenerInvocation>>
listenersMap = new HashMap<Class<? extends Annotation>,
List<ListenerInvocation>>(16, 0.99f);
+ static {
+ allowedListeners.put(CacheEntryCreated.class, CacheEntryCreatedEvent.class);
+ allowedListeners.put(CacheEntryRemoved.class, CacheEntryRemovedEvent.class);
+ allowedListeners.put(CacheEntryVisited.class, CacheEntryVisitedEvent.class);
+ allowedListeners.put(CacheEntryModified.class, CacheEntryModifiedEvent.class);
+ allowedListeners.put(CacheEntryActivated.class, CacheEntryActivatedEvent.class);
+ allowedListeners.put(CacheEntryPassivated.class, CacheEntryPassivatedEvent.class);
+ allowedListeners.put(CacheEntryLoaded.class, CacheEntryLoadedEvent.class);
+ allowedListeners.put(CacheEntryEvicted.class, CacheEntryEvictedEvent.class);
+ allowedListeners.put(TransactionRegistered.class,
TransactionRegisteredEvent.class);
+ allowedListeners.put(TransactionCompleted.class, TransactionCompletedEvent.class);
+ allowedListeners.put(CacheEntryInvalidated.class,
CacheEntryInvalidatedEvent.class);
+
+ }
+
final List<ListenerInvocation> cacheEntryCreatedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
final List<ListenerInvocation> cacheEntryRemovedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
final List<ListenerInvocation> cacheEntryVisitedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
@@ -96,11 +78,9 @@
final List<ListenerInvocation> transactionCompletedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
private Cache cache;
- // two separate executor services, one for sync and one for async listeners
- private ExecutorService syncProcessor;
- private ExecutorService asyncProcessor;
public CacheNotifierImpl() {
+
listenersMap.put(CacheEntryCreated.class, cacheEntryCreatedListeners);
listenersMap.put(CacheEntryRemoved.class, cacheEntryRemovedListeners);
listenersMap.put(CacheEntryVisited.class, cacheEntryVisitedListeners);
@@ -115,126 +95,18 @@
}
@Inject
- void injectDependencies(CacheSPI cache,
-
@ComponentName(KnownComponentNames.ASYNC_NOTIFICATION_EXECUTOR) ExecutorService executor)
{
+ void injectDependencies(CacheSPI cache) {
this.cache = cache;
- this.asyncProcessor = executor;
}
- @Stop
- void stop() {
- syncProcessor.shutdownNow();
- asyncProcessor.shutdownNow();
+ protected Log getLog() {
+ return log;
}
- @Destroy
- void destroy() {
- removeAllCacheListeners();
+ protected Map<Class<? extends Annotation>, Class>
getAllowedMethodAnnotations() {
+ return allowedListeners;
}
- @Start
- void start() {
- syncProcessor = new WithinThreadExecutor();
- }
-
- /**
- * Loops through all valid methods on the object passed in, and caches the relevant
methods as {@link
- * CacheNotifierImpl.ListenerInvocation} for invocation by reflection.
- *
- * @param listener object to be considered as a listener.
- */
- @SuppressWarnings("unchecked")
- private void validateAndAddListenerInvocation(Object listener) {
- boolean sync = testListenerClassValidity(listener.getClass());
-
- boolean foundMethods = false;
- // now try all methods on the listener for anything that we like. Note that only
PUBLIC methods are scanned.
- for (Method m : listener.getClass().getMethods()) {
- // loop through all valid method annotations
- for (int i = 0; i < allowedMethodAnnotations.length; i++) {
- if (m.isAnnotationPresent(allowedMethodAnnotations[i])) {
- testListenerMethodValidity(m, parameterTypes[i],
allowedMethodAnnotations[i].getName());
- addListenerInvocation(allowedMethodAnnotations[i], new
ListenerInvocation(listener, m, sync));
- foundMethods = true;
- }
- }
- }
-
- if (!foundMethods && log.isWarnEnabled())
- log.warn("Attempted to register listener of class " +
listener.getClass() + ", but no valid, public methods annotated with method-level
event annotations found! Ignoring listener.");
- }
-
- /**
- * Tests if a class is properly annotated as a CacheListener and returns whether
callbacks on this class should be
- * invoked synchronously or asynchronously.
- *
- * @param listenerClass class to inspect
- * @return true if callbacks on this class should use the syncProcessor; false if it
should use the asyncProcessor.
- */
- private static boolean testListenerClassValidity(Class<?> listenerClass) {
- Listener l = ReflectionUtil.getAnnotation(listenerClass, Listener.class);
- if (l == null)
- throw new IncorrectListenerException("Cache listener class MUST be
annotated with org.horizon.notifications.annotation.Listener");
- if (!Modifier.isPublic(listenerClass.getModifiers()))
- throw new IncorrectListenerException("Cache listener class MUST be
public!");
- return l.sync();
- }
-
- private static void testListenerMethodValidity(Method m, Class allowedParameter,
String annotationName) {
- if (m.getParameterTypes().length != 1 ||
!m.getParameterTypes()[0].isAssignableFrom(allowedParameter))
- throw new IncorrectListenerException("Methods annotated with " +
annotationName + " must accept exactly one parameter, of assignable from type "
+ allowedParameter.getName());
- if (!m.getReturnType().equals(void.class))
- throw new IncorrectListenerException("Methods annotated with " +
annotationName + " should have a return type of void.");
- }
-
- private void addListenerInvocation(Class annotation, ListenerInvocation li) {
- List<ListenerInvocation> result =
getListenerCollectionForAnnotation(annotation);
- result.add(li);
- }
-
- public void addListener(Object listener) {
- validateAndAddListenerInvocation(listener);
- }
-
- public void removeListener(Object listener) {
- for (Class annotation : allowedMethodAnnotations)
removeListenerInvocation(annotation, listener);
- }
-
- private void removeListenerInvocation(Class annotation, Object listener) {
- if (listener == null) return;
- List<ListenerInvocation> l = getListenerCollectionForAnnotation(annotation);
- Set<Object> markedForRemoval = new HashSet<Object>();
- for (ListenerInvocation li : l) {
- if (listener.equals(li.target)) markedForRemoval.add(li);
- }
- l.removeAll(markedForRemoval);
- }
-
- /**
- * Removes all listeners from the notifier, including the evictionPolicyListener.
- */
- @Stop(priority = 99)
- public void removeAllCacheListeners() {
- cacheEntryCreatedListeners.clear();
- cacheEntryRemovedListeners.clear();
- cacheEntryVisitedListeners.clear();
- cacheEntryModifiedListeners.clear();
- cacheEntryActivatedListeners.clear();
- cacheEntryPassivatedListeners.clear();
- cacheEntryLoadedListeners.clear();
- cacheEntryEvictedListeners.clear();
- transactionRegisteredListeners.clear();
- transactionCompletedListeners.clear();
- }
-
- public Set<Object> getListeners() {
- Set<Object> result = new HashSet<Object>();
- for (List<ListenerInvocation> list : listenersMap.values()) {
- for (ListenerInvocation li : list) result.add(li.target);
- }
- return Collections.unmodifiableSet(result);
- }
-
public void notifyCacheEntryCreated(Object key, boolean pre, InvocationContext ctx) {
if (!cacheEntryCreatedListeners.isEmpty()) {
boolean originLocal = ctx.isOriginLocal();
@@ -437,53 +309,4 @@
newContext.putLookedUpEntries(ctx.getLookedUpEntries());
return ctx;
}
-
- /**
- * Class that encapsulates a valid invocation for a given registered listener -
containing a reference to the method
- * to be invoked as well as the target object.
- */
- class ListenerInvocation {
- private final Object target;
- private final Method method;
- private final boolean sync;
-
- public ListenerInvocation(Object target, Method method, boolean sync) {
- this.target = target;
- this.method = method;
- this.sync = sync;
- }
-
- public void invoke(final Event e) {
- Runnable r = new Runnable() {
-
- public void run() {
- try {
- method.invoke(target, e);
- }
- catch (InvocationTargetException exception) {
- Throwable cause = exception.getCause();
- if (cause != null)
- throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, cause);
- else
- throw new CacheException("Caught exception invoking method
" + method + " on listener instance " + target, exception);
- }
- catch (IllegalAccessException exception) {
- log.warn("Unable to invoke method " + method + " on
Object instance " + target + " - removing this target object from list of
listeners!", exception);
- removeListener(target);
- }
- }
- };
-
- if (sync)
- syncProcessor.execute(r);
- else
- asyncProcessor.execute(r);
- }
- }
-
- private List<ListenerInvocation> getListenerCollectionForAnnotation(Class<?
extends Annotation> annotation) {
- List<ListenerInvocation> list = listenersMap.get(annotation);
- if (list == null) throw new CacheException("Unknown listener annotation:
" + annotation);
- return list;
- }
}
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifier.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -1,5 +1,8 @@
package org.horizon.notifications.cachemanagerlistener;
+import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
import org.horizon.notifications.Listenable;
import org.horizon.remoting.transport.Address;
@@ -11,6 +14,8 @@
* @author Manik Surtani
* @since 1.0
*/
+@NonVolatile
+(a)Scope(Scopes.GLOBAL)
public interface CacheManagerNotifier extends Listenable {
/**
* Notifies all registered listeners of a viewChange event. Note that viewChange
notifications are ALWAYS sent
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/main/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -1,56 +1,102 @@
package org.horizon.notifications.cachemanagerlistener;
-import org.horizon.factories.annotations.NonVolatile;
-import org.horizon.factories.annotations.Start;
import org.horizon.factories.annotations.Stop;
-import org.horizon.factories.scopes.Scope;
-import org.horizon.factories.scopes.Scopes;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.notifications.AbstractListenerImpl;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.CacheStartedEvent;
+import org.horizon.notifications.cachemanagerlistener.event.CacheStoppedEvent;
+import org.horizon.notifications.cachemanagerlistener.event.Event;
+import org.horizon.notifications.cachemanagerlistener.event.EventImpl;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.horizon.remoting.transport.Address;
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * Global, shared notifications. Typically on the CacheManager. See
+ * Global, shared notifications on the cache manager.
*
* @author Manik Surtani
* @since 1.0
*/
-@NonVolatile
-(a)Scope(Scopes.GLOBAL)
-public class CacheManagerNotifierImpl implements CacheManagerNotifier {
+public class CacheManagerNotifierImpl extends AbstractListenerImpl implements
CacheManagerNotifier {
- public void notifyViewChange(List<Address> members, Address myAddress) {
- // TODO: Manik: Customise this generated block
- }
+ private static final Log log = LogFactory.getLog(CacheManagerNotifierImpl.class);
- public void notifyCacheStarted(String cacheName) {
- // TODO: Manik: Customise this generated block
+ private static final Map<Class<? extends Annotation>, Class>
allowedListeners = new HashMap<Class<? extends Annotation>, Class>();
+
+ static {
+ allowedListeners.put(CacheStarted.class, CacheStartedEvent.class);
+ allowedListeners.put(CacheStopped.class, CacheStoppedEvent.class);
+ allowedListeners.put(ViewChanged.class, ViewChangedEvent.class);
}
- public void notifyCacheStopped(String cacheName) {
- // TODO: Manik: Customise this generated block
+ final List<ListenerInvocation> cacheStartedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
+ final List<ListenerInvocation> cacheStoppedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
+ final List<ListenerInvocation> viewChangedListeners = new
CopyOnWriteArrayList<ListenerInvocation>();
+
+ private CacheManager cacheManager;
+
+ public CacheManagerNotifierImpl() {
+ listenersMap.put(CacheStarted.class, cacheStartedListeners);
+ listenersMap.put(CacheStopped.class, cacheStoppedListeners);
+ listenersMap.put(ViewChanged.class, viewChangedListeners);
}
- public void addListener(Object listener) {
- // TODO: Manik: Customise this generated block
+ public void injectCacheManager(CacheManager cacheManager) {
+ this.cacheManager = cacheManager;
}
- public void removeListener(Object listener) {
- // TODO: Manik: Customise this generated block
+ public void notifyViewChange(List<Address> members, Address myAddress) {
+ if (!viewChangedListeners.isEmpty()) {
+ EventImpl e = new EventImpl();
+ e.setLocalAddress(myAddress);
+ e.setNewMemberList(members);
+ e.setCacheManager(cacheManager);
+ e.setType(Event.Type.VIEW_CHANGED);
+ for (ListenerInvocation listener : viewChangedListeners) listener.invoke(e);
+ }
}
- public Set<Object> getListeners() {
- return null; // TODO: Manik: Customise this generated block
+ public void notifyCacheStarted(String cacheName) {
+ if (!cacheStartedListeners.isEmpty()) {
+ EventImpl e = new EventImpl();
+ e.setCacheName(cacheName);
+ e.setCacheManager(cacheManager);
+ e.setType(Event.Type.CACHE_STARTED);
+ for (ListenerInvocation listener : cacheStartedListeners) listener.invoke(e);
+ }
}
- @Start
- public void start() {
- // TODO: Manik: Customise this generated block
+ public void notifyCacheStopped(String cacheName) {
+ if (!cacheStoppedListeners.isEmpty()) {
+ EventImpl e = new EventImpl();
+ e.setCacheName(cacheName);
+ e.setCacheManager(cacheManager);
+ e.setType(Event.Type.CACHE_STOPPED);
+ for (ListenerInvocation listener : cacheStoppedListeners) listener.invoke(e);
+ }
}
@Stop
- public void stop() {
+ void stop() {
+ if (syncProcessor != null) syncProcessor.shutdownNow();
+ if (asyncProcessor != null) asyncProcessor.shutdownNow();
+ }
+ protected Log getLog() {
+ return log;
}
+
+ protected Map<Class<? extends Annotation>, Class>
getAllowedMethodAnnotations() {
+ return allowedListeners;
+ }
}
Modified:
core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/test/java/org/horizon/notifications/ConcurrentNotificationTest.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -20,7 +20,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-@Test(groups = {"functional"}, sequential = true, testName =
"notifications.ConcurrentNotificationTest")
+@Test(groups = "functional", sequential = true, testName =
"notifications.ConcurrentNotificationTest")
public class ConcurrentNotificationTest {
Cache<String, String> cache;
CacheManager cm;
Modified:
core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java 2009-02-02
13:07:49 UTC (rev 7623)
+++
core/branches/flat/src/test/java/org/horizon/notifications/cachelistener/CacheNotifierImplTest.java 2009-02-02
14:31:03 UTC (rev 7624)
@@ -29,7 +29,7 @@
mockCache = createNiceMock(CacheSPI.class);
EasyMock.expect(mockCache.getInvocationContext()).andReturn(new
InvocationContextImpl()).anyTimes();
EasyMock.replay(mockCache);
- n.injectDependencies(mockCache, null);
+ n.injectDependencies(mockCache);
cl = new CacheListener();
n.start();
n.addListener(cl);