[jboss-cvs] JBossCache/src/org/jboss/cache/notifications ...
Manik Surtani
manik at jboss.org
Thu Jun 28 12:53:36 EDT 2007
User: msurtani
Date: 07/06/28 12:53:36
Modified: src/org/jboss/cache/notifications Notifier.java
Added: src/org/jboss/cache/notifications
IncorrectCacheListenerException.java
Log:
Notification changes
Revision Changes Path
1.33 +379 -260 JBossCache/src/org/jboss/cache/notifications/Notifier.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: Notifier.java
===================================================================
RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/notifications/Notifier.java,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -b -r1.32 -r1.33
--- Notifier.java 15 Jun 2007 13:18:59 -0000 1.32
+++ Notifier.java 28 Jun 2007 16:53:36 -0000 1.33
@@ -9,25 +9,24 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.Cache;
-import org.jboss.cache.CacheListener;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
-import org.jboss.cache.marshall.MethodCall;
-import org.jboss.cache.marshall.MethodCallFactory;
+import org.jboss.cache.notifications.annotation.*;
+import org.jboss.cache.notifications.event.*;
+import static org.jboss.cache.notifications.event.Event.Type.*;
import org.jboss.cache.util.MapCopy;
-import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jgroups.View;
+import javax.transaction.Transaction;
import java.lang.reflect.Method;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* Helper class that handles all notifications to registered listeners.
@@ -36,122 +35,112 @@
*/
public class Notifier<K, V>
{
-
- // calling iterator on a Concurrent Set is expensive due to synchronization - same problem
- // with calling isEmpty so hasListeners is an optimization to indicate whether or not listeners
- // is empty
- //
- private boolean hasListeners = false;
- private boolean hasListenersOrEvictionListener = false;
private Cache cache;
- // store this seperately from other listeners to avoid concurrency penalty of
- // iterating through Concurrent Set - eviction listener is always there (or almost always)
- // and there are less frequently other listeners so optimization is justified
- private CacheListener evictionPolicyListener;
-
- private final Set<CacheListener> listeners = new ConcurrentHashSet<CacheListener>();
private static final Log log = LogFactory.getLog(Notifier.class);
- // --- the java.lang.reflect.Methods of CacheListener - only the ones that are put on the notification queue
- private static Method nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeLoaded, nodePassivated, nodeActivated, nodeMoved;
-
private static Class emptyMap = Collections.emptyMap().getClass();
private static Class singletonMap = Collections.singletonMap(null, null).getClass();
-
- private ExecutorService executor;
- private int numExecutors;
-
- static
+ private static final Class[] allowedMethodAnnotations =
{
- try
- {
- nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class);
- nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, CacheListener.ModificationType.class, Map.class);
- nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map.class);
- nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class);
- nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map.class);
- nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class, Map.class);
- nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class, Map.class);
- nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class);
- }
- catch (Exception e)
- {
- log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e);
- }
- }
+ CacheStarted.class, CacheStopped.class, CacheBlocked.class, CacheUnblocked.class, NodeCreated.class, NodeRemoved.class, NodeVisited.class, NodeModified.class, NodeMoved.class,
+ NodeActivated.class, NodePassivated.class, NodeLoaded.class, NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class, ViewChanged.class
+ };
+ private static final Class[] parameterTypes =
+ {
+ CacheStartedEvent.class, CacheStoppedEvent.class, CacheBlockedEvent.class, CacheUnblockedEvent.class, NodeCreatedEvent.class, NodeRemovedEvent.class, NodeVisitedEvent.class, NodeModifiedEvent.class, NodeMovedEvent.class,
+ NodeActivatedEvent.class, NodePassivatedEvent.class, NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class, ViewChangedEvent.class
+ };
+ private final Map<Class, List<ListenerInvocation>> listenerInvocations = new ConcurrentHashMap<Class, List<ListenerInvocation>>();
public Notifier(Cache cache)
{
this.cache = cache;
- numExecutors = cache.getConfiguration().getNumberOfNotifierThreads();
}
/**
- * Drains any pending notifications. Won't accept any new notifications while this method is blocking. It shuts down
- * and nullifies the executor service used to deliver notifications.
- * <p/>
- * This method does not destroy the Notifier instance - it can still be reused as the executor service is re-initialised
- * lazily when used next.
+ * Loops through all valid methods on the object passed in, and caches the relevant methods as {@link org.jboss.cache.notifications.Notifier.ListenerInvocation}
+ * for invocation by reflection.
+ *
+ * @param listener object to be considered as a listener.
*/
- public void drainNotificationQueue()
+ private void validateAndAddListenerInvocation(Object listener)
{
- if (executor != null)
+ if (!listener.getClass().isAnnotationPresent(CacheListener.class))
+ throw new IncorrectCacheListenerException("An Object must have the org.jboss.cache.notifications.annotation.CacheListener annotation to be considered a cache listener!");
+
+ // now try all methods on the listener for anything that we like:
+ for (Method m : listener.getClass().getMethods())
{
- executor.shutdown();
- try
+ // loop through all valid method annotations
+ for (int i = 0; i < allowedMethodAnnotations.length; i++)
+ {
+ if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
+ {
+ if (m.getParameterTypes().length == 1 && m.getParameterTypes()[0].isAssignableFrom(parameterTypes[i]) && m.getReturnType().equals(void.class))
{
- while (!executor.isTerminated()) executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m));
}
- catch (InterruptedException ie)
+ else
{
- Thread.currentThread().interrupt();
+ throw new IncorrectCacheListenerException("Methods annotated with " + allowedMethodAnnotations[i].getName() + " need to accept one, and only one, parameter, assignable from " + parameterTypes[i].getName());
}
- finally
+ }
+ }
+ }
+ }
+
+ private void addListenerInvocation(Class annotation, ListenerInvocation li)
{
- executor = null;
+ synchronized (listenerInvocations)
+ {
+ List<ListenerInvocation> l = listenerInvocations.get(annotation);
+ if (l == null)
+ {
+ l = new CopyOnWriteArrayList<ListenerInvocation>();
+ listenerInvocations.put(annotation, l);
}
+ l.add(li);
}
}
/**
- * Sets an eviction policy listener.
+ * Adds a cache listener to the list of cache listeners registered.
*
- * @param l
+ * @param listener
*/
- public void setEvictionPolicyListener(CacheListener l)
+ public void addCacheListener(Object listener)
{
- evictionPolicyListener = l;
- if (evictionPolicyListener != null) hasListenersOrEvictionListener = true;
+ validateAndAddListenerInvocation(listener);
}
/**
- * Adds a cache listener to the list of cache listeners registered.
+ * Removes a cache listener from the list of cache listeners registered.
*
- * @param l
+ * @param listener
*/
- public void addCacheListener(CacheListener l)
+ public void removeCacheListener(Object listener)
{
- synchronized (listeners)
+ synchronized (listenerInvocations)
{
- listeners.add(l);
- hasListeners = true;
- hasListenersOrEvictionListener = true;
+ for (Class annotation : allowedMethodAnnotations) removeListenerInvocation(annotation, listener);
}
}
- /**
- * Removes a cache listener from the list of cache listeners registered.
- *
- * @param l
- */
- public void removeCacheListener(CacheListener l)
+ private void removeListenerInvocation(Class annotation, Object listener)
+ {
+ List<ListenerInvocation> l = listenerInvocations.get(annotation);
+ Set<Object> markedForRemoval = new HashSet<Object>();
+ if (l != null)
{
- synchronized (listeners)
+ for (ListenerInvocation li : l)
{
- listeners.remove(l);
- hasListeners = !listeners.isEmpty();
- hasListenersOrEvictionListener = hasListeners || evictionPolicyListener != null;
+ if (listener.equals(li.target)) markedForRemoval.add(li);
+ }
+
+ l.removeAll(markedForRemoval);
+
+ if (l.size() == 0) listenerInvocations.remove(annotation);
}
}
@@ -160,20 +149,30 @@
*/
public void removeAllCacheListeners()
{
- synchronized (listeners)
+ synchronized (listenerInvocations)
{
- listeners.clear();
- hasListeners = false;
- hasListenersOrEvictionListener = evictionPolicyListener != null;
+ listenerInvocations.clear();
}
}
/**
- * Retrieves an (unmodifiable) set of cache listeners registered.
+ * @return Retrieves an (unmodifiable) set of cache listeners registered.
*/
- public Set<CacheListener> getCacheListeners()
+ public Set<Object> getCacheListeners()
+ {
+ Set<Object> s = new HashSet<Object>();
+ synchronized (listenerInvocations)
+ {
+ for (Class annotation : allowedMethodAnnotations)
{
- return Collections.unmodifiableSet(listeners);
+ List<ListenerInvocation> l = listenerInvocations.get(annotation);
+ if (l != null)
+ {
+ for (ListenerInvocation li : l) s.add(li.target);
+ }
+ }
+ }
+ return Collections.unmodifiableSet(s);
}
/**
@@ -186,8 +185,23 @@
public void notifyNodeCreated(Fqn fqn, boolean pre, InvocationContext ctx)
{
boolean originLocal = ctx.isOriginLocal();
- MethodCall call = MethodCallFactory.createWithNullId(nodeCreated, fqn, pre, originLocal);
- ctx.addCacheListenerEvent(call);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeCreated.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setType(NODE_CREATED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -199,12 +213,29 @@
* @param data
* @param ctx context of invocation
*/
- public void notifyNodeModified(Fqn fqn, boolean pre, CacheListener.ModificationType modificationType, Map<K, V> data, InvocationContext ctx)
+ public void notifyNodeModified(Fqn fqn, boolean pre, NodeModifiedEvent.ModificationType modificationType, Map<K, V> data, InvocationContext ctx)
{
boolean originLocal = ctx.isOriginLocal();
Map dataCopy = copy(data);
- MethodCall call = MethodCallFactory.createWithNullId(nodeModified, fqn, pre, originLocal, modificationType, dataCopy);
- ctx.addCacheListenerEvent(call);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeModified.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setModificationType(modificationType);
+ e.setData(dataCopy);
+ e.setType(NODE_MODIFIED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -219,8 +250,24 @@
{
boolean originLocal = ctx.isOriginLocal();
Map dataCopy = copy(data);
- MethodCall call = MethodCallFactory.createWithNullId(nodeRemoved, fqn, pre, originLocal, dataCopy);
- ctx.addCacheListenerEvent(call);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeRemoved.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setData(dataCopy);
+ e.setType(NODE_REMOVED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -232,15 +279,45 @@
*/
public void notifyNodeVisited(Fqn fqn, boolean pre, InvocationContext ctx)
{
- MethodCall call = MethodCallFactory.createWithNullId(nodeVisited, fqn, pre);
- ctx.addCacheListenerEvent(call);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeVisited.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setType(NODE_VISITED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
public void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, InvocationContext ctx)
{
boolean originLocal = ctx.isOriginLocal();
- MethodCall call = MethodCallFactory.createWithNullId(nodeMoved, originalFqn, newFqn, pre, originLocal);
- ctx.addCacheListenerEvent(call);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeMoved.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(originalFqn);
+ e.setTargetFqn(newFqn);
+ e.setTransaction(tx);
+ e.setType(NODE_MOVED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
@@ -254,28 +331,22 @@
public void notifyNodeEvicted(final Fqn fqn, final boolean pre, InvocationContext ctx)
{
final boolean originLocal = ctx.isOriginLocal();
- // always send eviction calls immediately - they do not go on the queue
- if (hasListenersOrEvictionListener)
- {
-// InvocationContext backup = resetInvocationContext(ctx);
- if (evictionPolicyListener != null)
- {
- evictionPolicyListener.nodeEvicted(fqn, pre, originLocal);
- }
- if (hasListeners)
- {
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- for (CacheListener listener : listeners)
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeEvicted.class);
+
+ if (listeners != null && listeners.size() > 0)
{
- listener.nodeEvicted(fqn, pre, originLocal);
- }
- }
- });
- }
-// restoreInvocationContext(backup);
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setType(NODE_EVICTED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
}
}
@@ -289,9 +360,26 @@
*/
public void notifyNodeLoaded(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
{
- Map<K, V> dataCopy = copy(data);
- MethodCall call = MethodCallFactory.createWithNullId(nodeLoaded, fqn, pre, dataCopy);
- ctx.addCacheListenerEvent(call);
+ boolean originLocal = ctx.isOriginLocal();
+ Map dataCopy = copy(data);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeLoaded.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setData(dataCopy);
+ e.setType(NODE_LOADED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -304,9 +392,26 @@
*/
public void notifyNodeActivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
{
- Map<K, V> dataCopy = copy(data);
- MethodCall call = MethodCallFactory.createWithNullId(nodeActivated, fqn, pre, dataCopy);
- ctx.addCacheListenerEvent(call);
+ boolean originLocal = ctx.isOriginLocal();
+ Map dataCopy = copy(data);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodeActivated.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(originLocal);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setData(dataCopy);
+ e.setType(NODE_ACTIVATED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -316,13 +421,27 @@
* @param pre
* @param data
* @param ctx context of invocation
- * @param sendImmediately whether notifications are deferred to the NotificationInterceptor (sendImmediately = false)
*/
- public void notifyNodePassivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx, boolean sendImmediately)
+ public void notifyNodePassivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
+ {
+ Map dataCopy = copy(data);
+ Transaction tx = ctx.getTransaction();
+
+ List<ListenerInvocation> listeners = listenerInvocations.get(NodePassivated.class);
+
+ if (listeners != null && listeners.size() > 0)
{
- Map<K, V> dataCopy = copy(data);
- MethodCall call = MethodCallFactory.createWithNullId(nodePassivated, fqn, pre, dataCopy);
- ctx.addCacheListenerEvent(call);
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setPre(pre);
+ e.setFqn(fqn);
+ e.setTransaction(tx);
+ e.setData(dataCopy);
+ e.setType(NODE_PASSIVATED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
}
/**
@@ -333,27 +452,16 @@
*/
public void notifyCacheStarted(final CacheSPI cache, InvocationContext ctx)
{
- if (hasListenersOrEvictionListener)
- {
-// InvocationContext backup = resetInvocationContext(ctx);
- if (evictionPolicyListener != null)
- {
- evictionPolicyListener.cacheStarted(cache);
- }
- if (hasListeners)
- {
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- for (CacheListener listener : listeners)
+ List<ListenerInvocation> listeners = listenerInvocations.get(CacheStarted.class);
+
+ if (listeners != null && listeners.size() > 0)
{
- listener.cacheStarted(cache);
- }
- }
- });
- }
-// restoreInvocationContext(backup);
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setType(CACHE_STARTED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
}
}
@@ -365,28 +473,16 @@
*/
public void notifyCacheStopped(final CacheSPI cache, InvocationContext ctx)
{
- if (hasListenersOrEvictionListener)
- {
-// InvocationContext backup = resetInvocationContext(ctx);
- if (evictionPolicyListener != null)
- {
- evictionPolicyListener.cacheStopped(cache);
- }
- if (hasListeners)
- {
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- for (CacheListener listener : listeners)
- {
- listener.cacheStopped(cache);
- }
- }
- });
+ List<ListenerInvocation> listeners = listenerInvocations.get(CacheStopped.class);
- }
-// restoreInvocationContext(backup);
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setType(CACHE_STOPPED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
}
}
@@ -399,31 +495,99 @@
*/
public void notifyViewChange(final View new_view, InvocationContext ctx)
{
- if (hasListenersOrEvictionListener)
+ List<ListenerInvocation> listeners = listenerInvocations.get(ViewChanged.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setNewView(new_view);
+ e.setType(VIEW_CHANGED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
+ }
+
+ /**
+ * Notifies all registered listeners of a transaction completion event.
+ *
+ * @param transaction the transaction that has just completed
+ * @param successful if true, the transaction committed. If false, this is a rollback event
+ */
+ public void notifyTransactionCompleted(Transaction transaction, boolean successful, InvocationContext ctx)
{
-// InvocationContext backup = resetInvocationContext(ctx);
- if (evictionPolicyListener != null)
+ Transaction tx = ctx.getTransaction();
+ boolean isOriginLocal = ctx.isOriginLocal();
+ List<ListenerInvocation> listeners = listenerInvocations.get(TransactionCompleted.class);
+
+ if (listeners != null && listeners.size() > 0)
{
- evictionPolicyListener.viewChange(new_view);
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(isOriginLocal);
+ e.setTransaction(tx);
+ e.setSuccessful(successful);
+ e.setType(TRANSACTION_COMPLETED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
}
- if (hasListeners)
+ }
+
+ /**
+ * Notifies all registered listeners of a transaction registration event.
+ *
+ * @param transaction the transaction that has just completed
+ */
+ public void notifyTransactionRegistered(Transaction transaction, InvocationContext ctx)
{
- // this happens in realtime in the same thread
-// getExecutor().execute(new Runnable()
-// {
-// public void run()
-// {
- for (CacheListener listener : listeners)
+ Transaction tx = ctx.getTransaction();
+ boolean isOriginLocal = ctx.isOriginLocal();
+ List<ListenerInvocation> listeners = listenerInvocations.get(TransactionRegistered.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setOriginLocal(isOriginLocal);
+ e.setTransaction(tx);
+ e.setType(TRANSACTION_REGISTERED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+ restoreInvocationContext(backup);
+ }
+ }
+
+ public void notifyCacheBlocked(CacheSPI cache, boolean pre)
{
- listener.viewChange(new_view);
+ List<ListenerInvocation> listeners = listenerInvocations.get(CacheBlocked.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setPre(pre);
+ e.setType(CACHE_BLOCKED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
}
-// }
-// });
}
-// restoreInvocationContext(backup);
+
+ public void notifyCacheUnblocked(CacheSPI cache, boolean pre)
+ {
+ List<ListenerInvocation> listeners = listenerInvocations.get(CacheUnblocked.class);
+
+ if (listeners != null && listeners.size() > 0)
+ {
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setPre(pre);
+ e.setType(CACHE_UNBLOCKED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
}
}
+
private Map<K, V> copy(Map<K, V> data)
{
if (safe(data)) return data;
@@ -467,77 +631,32 @@
}
/**
- * Fires off all notifications for a given queue.
- *
- * @param ctx
- * @param queue queue to process.
+ * Class that encapsulates a valid invocation for a given registered listener - containing a reference to the
+ * method to be invoked as well as the target object.
*/
- public void invokeQueuedNotifications(InvocationContext ctx, final List<MethodCall> queue)
+ class ListenerInvocation
{
-// InvocationContext backup = resetInvocationContext(ctx);
+ private Object target;
+ private Method method;
- for (MethodCall c : queue)
- {
- if (log.isTraceEnabled()) log.trace("Invoking queued notification " + c);
- if (evictionPolicyListener != null)
- {
- try
- {
- c.invoke(evictionPolicyListener);
- }
- catch (Throwable throwable)
+ public ListenerInvocation(Object target, Method method)
{
- log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable);
- }
- }
+ this.target = target;
+ this.method = method;
}
- if (hasListeners)
- {
- getExecutor().execute(new Runnable()
- {
- public void run()
- {
- for (CacheListener listener : listeners)
- {
- for (MethodCall c : queue)
+ public void invoke(Event e)
{
try
{
- c.invoke(listener);
+ method.invoke(target, e);
}
- catch (Throwable throwable)
+ catch (Exception ite)
{
- log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable);
- }
+ log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", ite);
+ removeCacheListener(this.target);
}
}
}
- });
- }
-
-// restoreInvocationContext(backup);
- }
-
- /**
- * Fires off all notifications that have been registered within the current invocation, with sendImmediately set to false.
- *
- * @param ctx context of the invocation
- */
- public void invokeQueuedNotifications(InvocationContext ctx)
- {
- invokeQueuedNotifications(ctx, ctx.getCacheListenerEvents());
- }
- private Executor getExecutor()
- {
- if (executor == null)
- {
- synchronized (this)
- {
- if (executor == null) executor = Executors.newFixedThreadPool(numExecutors);
- }
- }
- return executor;
- }
}
1.1 date: 2007/06/28 16:53:36; author: msurtani; state: Exp;JBossCache/src/org/jboss/cache/notifications/IncorrectCacheListenerException.java
Index: IncorrectCacheListenerException.java
===================================================================
package org.jboss.cache.notifications;
import org.jboss.cache.CacheException;
/**
* Thrown when an incorrectly annotated class is added as a cache listener using the {@link org.jboss.cache.Cache#addCacheListener(Object)} API.
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
* @since 2.0.0
*/
public class IncorrectCacheListenerException extends CacheException
{
public IncorrectCacheListenerException(String s)
{
super(s);
}
}
More information about the jboss-cvs-commits
mailing list