[jboss-cvs] JBossCache/src/org/jboss/cache/notifications ...

Manik Surtani manik at jboss.org
Thu Jun 28 12:53:36 EDT 2007


  User: msurtani
  Date: 07/06/28 12:53:36

  Modified:    src/org/jboss/cache/notifications   Notifier.java
  Added:       src/org/jboss/cache/notifications  
                        IncorrectCacheListenerException.java
  Log:
  Notification changes
  
  Revision  Changes    Path
  1.33      +379 -260  JBossCache/src/org/jboss/cache/notifications/Notifier.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: Notifier.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/notifications/Notifier.java,v
  retrieving revision 1.32
  retrieving revision 1.33
  diff -u -b -r1.32 -r1.33
  --- Notifier.java	15 Jun 2007 13:18:59 -0000	1.32
  +++ Notifier.java	28 Jun 2007 16:53:36 -0000	1.33
  @@ -9,25 +9,24 @@
   import org.apache.commons.logging.Log;
   import org.apache.commons.logging.LogFactory;
   import org.jboss.cache.Cache;
  -import org.jboss.cache.CacheListener;
   import org.jboss.cache.CacheSPI;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.InvocationContext;
  -import org.jboss.cache.marshall.MethodCall;
  -import org.jboss.cache.marshall.MethodCallFactory;
  +import org.jboss.cache.notifications.annotation.*;
  +import org.jboss.cache.notifications.event.*;
  +import static org.jboss.cache.notifications.event.Event.Type.*;
   import org.jboss.cache.util.MapCopy;
  -import org.jboss.cache.util.concurrent.ConcurrentHashSet;
   import org.jgroups.View;
   
  +import javax.transaction.Transaction;
   import java.lang.reflect.Method;
   import java.util.Collections;
  +import java.util.HashSet;
   import java.util.List;
   import java.util.Map;
   import java.util.Set;
  -import java.util.concurrent.Executor;
  -import java.util.concurrent.ExecutorService;
  -import java.util.concurrent.Executors;
  -import java.util.concurrent.TimeUnit;
  +import java.util.concurrent.ConcurrentHashMap;
  +import java.util.concurrent.CopyOnWriteArrayList;
   
   /**
    * Helper class that handles all notifications to registered listeners.
  @@ -36,122 +35,112 @@
    */
   public class Notifier<K, V>
   {
  -
  -   // calling iterator on a Concurrent Set is expensive due to synchronization - same problem
  -   // with calling isEmpty so hasListeners is an optimization to indicate whether or not listeners
  -   // is empty
  -   //
  -   private boolean hasListeners = false;
  -   private boolean hasListenersOrEvictionListener = false;
      private Cache cache;
   
  -   // store this seperately from other listeners to avoid concurrency penalty of
  -   // iterating through Concurrent Set - eviction listener is always there (or almost always)
  -   // and there are less frequently other listeners so optimization is justified
  -   private CacheListener evictionPolicyListener;
  -
  -   private final Set<CacheListener> listeners = new ConcurrentHashSet<CacheListener>();
      private static final Log log = LogFactory.getLog(Notifier.class);
   
  -   // --- the java.lang.reflect.Methods of CacheListener - only the ones that are put on the notification queue
  -   private static Method nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeLoaded, nodePassivated, nodeActivated, nodeMoved;
  -
      private static Class emptyMap = Collections.emptyMap().getClass();
      private static Class singletonMap = Collections.singletonMap(null, null).getClass();
  -
  -   private ExecutorService executor;
  -   private int numExecutors;
  -
  -   static
  +   private static final Class[] allowedMethodAnnotations =
      {
  -      try
  -      {
  -         nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class);
  -         nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, CacheListener.ModificationType.class, Map.class);
  -         nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map.class);
  -         nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class);
  -         nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map.class);
  -         nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class, Map.class);
  -         nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class, Map.class);
  -         nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class);
  -      }
  -      catch (Exception e)
  -      {
  -         log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e);
  -      }
  -   }
  +                   CacheStarted.class, CacheStopped.class, CacheBlocked.class, CacheUnblocked.class, NodeCreated.class, NodeRemoved.class, NodeVisited.class, NodeModified.class, NodeMoved.class,
  +                   NodeActivated.class, NodePassivated.class, NodeLoaded.class, NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class, ViewChanged.class
  +           };
  +   private static final Class[] parameterTypes =
  +           {
  +                   CacheStartedEvent.class, CacheStoppedEvent.class, CacheBlockedEvent.class, CacheUnblockedEvent.class, NodeCreatedEvent.class, NodeRemovedEvent.class, NodeVisitedEvent.class, NodeModifiedEvent.class, NodeMovedEvent.class,
  +                   NodeActivatedEvent.class, NodePassivatedEvent.class, NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class, ViewChangedEvent.class
  +           };
  +   private final Map<Class, List<ListenerInvocation>> listenerInvocations = new ConcurrentHashMap<Class, List<ListenerInvocation>>();
   
      public Notifier(Cache cache)
      {
         this.cache = cache;
  -      numExecutors = cache.getConfiguration().getNumberOfNotifierThreads();
      }
   
      /**
  -    * Drains any pending notifications.  Won't accept any new notifications while this method is blocking.  It shuts down
  -    * and nullifies the executor service used to deliver notifications.
  -    * <p/>
  -    * This method does not destroy the Notifier instance - it can still be reused as the executor service is re-initialised
  -    * lazily when used next.
  +    * Loops through all valid methods on the object passed in, and caches the relevant methods as {@link org.jboss.cache.notifications.Notifier.ListenerInvocation}
  +    * for invocation by reflection.
  +    *
  +    * @param listener object to be considered as a listener.
       */
  -   public void drainNotificationQueue()
  +   private void validateAndAddListenerInvocation(Object listener)
      {
  -      if (executor != null)
  +      if (!listener.getClass().isAnnotationPresent(CacheListener.class))
  +         throw new IncorrectCacheListenerException("An Object must have the org.jboss.cache.notifications.annotation.CacheListener annotation to be considered a cache listener!");
  +
  +      // now try all methods on the listener for anything that we like:
  +      for (Method m : listener.getClass().getMethods())
         {
  -         executor.shutdown();
  -         try
  +         // loop through all valid method annotations
  +         for (int i = 0; i < allowedMethodAnnotations.length; i++)
  +         {
  +            if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
  +            {
  +               if (m.getParameterTypes().length == 1 && m.getParameterTypes()[0].isAssignableFrom(parameterTypes[i]) && m.getReturnType().equals(void.class))
            {
  -            while (!executor.isTerminated()) executor.awaitTermination(500, TimeUnit.MILLISECONDS);
  +                  addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m));
            }
  -         catch (InterruptedException ie)
  +               else
            {
  -            Thread.currentThread().interrupt();
  +                  throw new IncorrectCacheListenerException("Methods annotated with " + allowedMethodAnnotations[i].getName() + " need to accept one, and only one, parameter, assignable from " + parameterTypes[i].getName());
            }
  -         finally
  +            }
  +         }
  +      }
  +   }
  +
  +   private void addListenerInvocation(Class annotation, ListenerInvocation li)
            {
  -            executor = null;
  +      synchronized (listenerInvocations)
  +      {
  +         List<ListenerInvocation> l = listenerInvocations.get(annotation);
  +         if (l == null)
  +         {
  +            l = new CopyOnWriteArrayList<ListenerInvocation>();
  +            listenerInvocations.put(annotation, l);
            }
  +         l.add(li);
         }
      }
   
      /**
  -    * Sets an eviction policy listener.
  +    * Adds a cache listener to the list of cache listeners registered.
       *
  -    * @param l
  +    * @param listener
       */
  -   public void setEvictionPolicyListener(CacheListener l)
  +   public void addCacheListener(Object listener)
      {
  -      evictionPolicyListener = l;
  -      if (evictionPolicyListener != null) hasListenersOrEvictionListener = true;
  +      validateAndAddListenerInvocation(listener);
      }
   
      /**
  -    * Adds a cache listener to the list of cache listeners registered.
  +    * Removes a cache listener from the list of cache listeners registered.
       *
  -    * @param l
  +    * @param listener
       */
  -   public void addCacheListener(CacheListener l)
  +   public void removeCacheListener(Object listener)
      {
  -      synchronized (listeners)
  +      synchronized (listenerInvocations)
         {
  -         listeners.add(l);
  -         hasListeners = true;
  -         hasListenersOrEvictionListener = true;
  +         for (Class annotation : allowedMethodAnnotations) removeListenerInvocation(annotation, listener);
         }
      }
   
  -   /**
  -    * Removes a cache listener from the list of cache listeners registered.
  -    *
  -    * @param l
  -    */
  -   public void removeCacheListener(CacheListener l)
  +   private void removeListenerInvocation(Class annotation, Object listener)
  +   {
  +      List<ListenerInvocation> l = listenerInvocations.get(annotation);
  +      Set<Object> markedForRemoval = new HashSet<Object>();
  +      if (l != null)
      {
  -      synchronized (listeners)
  +         for (ListenerInvocation li : l)
         {
  -         listeners.remove(l);
  -         hasListeners = !listeners.isEmpty();
  -         hasListenersOrEvictionListener = hasListeners || evictionPolicyListener != null;
  +            if (listener.equals(li.target)) markedForRemoval.add(li);
  +         }
  +
  +         l.removeAll(markedForRemoval);
  +
  +         if (l.size() == 0) listenerInvocations.remove(annotation);
         }
      }
   
  @@ -160,20 +149,30 @@
       */
      public void removeAllCacheListeners()
      {
  -      synchronized (listeners)
  +      synchronized (listenerInvocations)
         {
  -         listeners.clear();
  -         hasListeners = false;
  -         hasListenersOrEvictionListener = evictionPolicyListener != null;
  +         listenerInvocations.clear();
         }
      }
   
      /**
  -    * Retrieves an (unmodifiable) set of cache listeners registered.
  +    * @return Retrieves an (unmodifiable) set of cache listeners registered.
       */
  -   public Set<CacheListener> getCacheListeners()
  +   public Set<Object> getCacheListeners()
  +   {
  +      Set<Object> s = new HashSet<Object>();
  +      synchronized (listenerInvocations)
  +      {
  +         for (Class annotation : allowedMethodAnnotations)
      {
  -      return Collections.unmodifiableSet(listeners);
  +            List<ListenerInvocation> l = listenerInvocations.get(annotation);
  +            if (l != null)
  +            {
  +               for (ListenerInvocation li : l) s.add(li.target);
  +            }
  +         }
  +      }
  +      return Collections.unmodifiableSet(s);
      }
   
      /**
  @@ -186,8 +185,23 @@
      public void notifyNodeCreated(Fqn fqn, boolean pre, InvocationContext ctx)
      {
         boolean originLocal = ctx.isOriginLocal();
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeCreated, fqn, pre, originLocal);
  -      ctx.addCacheListenerEvent(call);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeCreated.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setType(NODE_CREATED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -199,12 +213,29 @@
       * @param data
       * @param ctx              context of invocation
       */
  -   public void notifyNodeModified(Fqn fqn, boolean pre, CacheListener.ModificationType modificationType, Map<K, V> data, InvocationContext ctx)
  +   public void notifyNodeModified(Fqn fqn, boolean pre, NodeModifiedEvent.ModificationType modificationType, Map<K, V> data, InvocationContext ctx)
      {
         boolean originLocal = ctx.isOriginLocal();
         Map dataCopy = copy(data);
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeModified, fqn, pre, originLocal, modificationType, dataCopy);
  -      ctx.addCacheListenerEvent(call);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeModified.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setModificationType(modificationType);
  +         e.setData(dataCopy);
  +         e.setType(NODE_MODIFIED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -219,8 +250,24 @@
      {
         boolean originLocal = ctx.isOriginLocal();
         Map dataCopy = copy(data);
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeRemoved, fqn, pre, originLocal, dataCopy);
  -      ctx.addCacheListenerEvent(call);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeRemoved.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setData(dataCopy);
  +         e.setType(NODE_REMOVED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -232,15 +279,45 @@
       */
      public void notifyNodeVisited(Fqn fqn, boolean pre, InvocationContext ctx)
      {
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeVisited, fqn, pre);
  -      ctx.addCacheListenerEvent(call);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeVisited.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setType(NODE_VISITED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      public void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, InvocationContext ctx)
      {
         boolean originLocal = ctx.isOriginLocal();
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeMoved, originalFqn, newFqn, pre, originLocal);
  -      ctx.addCacheListenerEvent(call);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeMoved.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(originalFqn);
  +         e.setTargetFqn(newFqn);
  +         e.setTransaction(tx);
  +         e.setType(NODE_MOVED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
   
  @@ -254,28 +331,22 @@
      public void notifyNodeEvicted(final Fqn fqn, final boolean pre, InvocationContext ctx)
      {
         final boolean originLocal = ctx.isOriginLocal();
  -      // always send eviction calls immediately - they do not go on the queue
  -      if (hasListenersOrEvictionListener)
  -      {
  -//         InvocationContext backup = resetInvocationContext(ctx);
  -         if (evictionPolicyListener != null)
  -         {
  -            evictionPolicyListener.nodeEvicted(fqn, pre, originLocal);
  -         }
  -         if (hasListeners)
  -         {
  -            getExecutor().execute(new Runnable()
  -            {
  -               public void run()
  -               {
  -                  for (CacheListener listener : listeners)
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeEvicted.class);
  +
  +      if (listeners != null && listeners.size() > 0)
                     {
  -                     listener.nodeEvicted(fqn, pre, originLocal);
  -                  }
  -               }
  -            });
  -         }
  -//         restoreInvocationContext(backup);
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setType(NODE_EVICTED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
         }
      }
   
  @@ -289,9 +360,26 @@
       */
      public void notifyNodeLoaded(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
      {
  -      Map<K, V> dataCopy = copy(data);
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeLoaded, fqn, pre, dataCopy);
  -      ctx.addCacheListenerEvent(call);
  +      boolean originLocal = ctx.isOriginLocal();
  +      Map dataCopy = copy(data);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeLoaded.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setData(dataCopy);
  +         e.setType(NODE_LOADED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -304,9 +392,26 @@
       */
      public void notifyNodeActivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
      {
  -      Map<K, V> dataCopy = copy(data);
  -      MethodCall call = MethodCallFactory.createWithNullId(nodeActivated, fqn, pre, dataCopy);
  -      ctx.addCacheListenerEvent(call);
  +      boolean originLocal = ctx.isOriginLocal();
  +      Map dataCopy = copy(data);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodeActivated.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(originLocal);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setData(dataCopy);
  +         e.setType(NODE_ACTIVATED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -316,13 +421,27 @@
       * @param pre
       * @param data
       * @param ctx             context of invocation
  -    * @param sendImmediately whether notifications are deferred to the NotificationInterceptor (sendImmediately = false)
       */
  -   public void notifyNodePassivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx, boolean sendImmediately)
  +   public void notifyNodePassivated(Fqn fqn, boolean pre, Map<K, V> data, InvocationContext ctx)
  +   {
  +      Map dataCopy = copy(data);
  +      Transaction tx = ctx.getTransaction();
  +
  +      List<ListenerInvocation> listeners = listenerInvocations.get(NodePassivated.class);
  +
  +      if (listeners != null && listeners.size() > 0)
      {
  -      Map<K, V> dataCopy = copy(data);
  -      MethodCall call = MethodCallFactory.createWithNullId(nodePassivated, fqn, pre, dataCopy);
  -      ctx.addCacheListenerEvent(call);
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setPre(pre);
  +         e.setFqn(fqn);
  +         e.setTransaction(tx);
  +         e.setData(dataCopy);
  +         e.setType(NODE_PASSIVATED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
      }
   
      /**
  @@ -333,27 +452,16 @@
       */
      public void notifyCacheStarted(final CacheSPI cache, InvocationContext ctx)
      {
  -      if (hasListenersOrEvictionListener)
  -      {
  -//         InvocationContext backup = resetInvocationContext(ctx);
  -         if (evictionPolicyListener != null)
  -         {
  -            evictionPolicyListener.cacheStarted(cache);
  -         }
  -         if (hasListeners)
  -         {
  -            getExecutor().execute(new Runnable()
  -            {
  -               public void run()
  -               {
  -                  for (CacheListener listener : listeners)
  +      List<ListenerInvocation> listeners = listenerInvocations.get(CacheStarted.class);
  +
  +      if (listeners != null && listeners.size() > 0)
                     {
  -                     listener.cacheStarted(cache);
  -                  }
  -               }
  -            });
  -         }
  -//         restoreInvocationContext(backup);
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setType(CACHE_STARTED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
         }
      }
   
  @@ -365,28 +473,16 @@
       */
      public void notifyCacheStopped(final CacheSPI cache, InvocationContext ctx)
      {
  -      if (hasListenersOrEvictionListener)
  -      {
  -//         InvocationContext backup = resetInvocationContext(ctx);
  -         if (evictionPolicyListener != null)
  -         {
  -            evictionPolicyListener.cacheStopped(cache);
  -         }
  -         if (hasListeners)
  -         {
  -            getExecutor().execute(new Runnable()
  -            {
  -               public void run()
  -               {
  -                  for (CacheListener listener : listeners)
  -                  {
  -                     listener.cacheStopped(cache);
  -                  }
  -               }
  -            });
  +      List<ListenerInvocation> listeners = listenerInvocations.get(CacheStopped.class);
   
  -         }
  -//         restoreInvocationContext(backup);
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setType(CACHE_STOPPED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
         }
      }
   
  @@ -399,31 +495,99 @@
       */
      public void notifyViewChange(final View new_view, InvocationContext ctx)
      {
  -      if (hasListenersOrEvictionListener)
  +      List<ListenerInvocation> listeners = listenerInvocations.get(ViewChanged.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setNewView(new_view);
  +         e.setType(VIEW_CHANGED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
  +   }
  +
  +   /**
  +    * Notifies all registered listeners of a transaction completion event.
  +    *
  +    * @param transaction the transaction that has just completed
  +    * @param successful  if true, the transaction committed.  If false, this is a rollback event
  +    */
  +   public void notifyTransactionCompleted(Transaction transaction, boolean successful, InvocationContext ctx)
         {
  -//         InvocationContext backup = resetInvocationContext(ctx);
  -         if (evictionPolicyListener != null)
  +      Transaction tx = ctx.getTransaction();
  +      boolean isOriginLocal = ctx.isOriginLocal();
  +      List<ListenerInvocation> listeners = listenerInvocations.get(TransactionCompleted.class);
  +
  +      if (listeners != null && listeners.size() > 0)
            {
  -            evictionPolicyListener.viewChange(new_view);
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(isOriginLocal);
  +         e.setTransaction(tx);
  +         e.setSuccessful(successful);
  +         e.setType(TRANSACTION_COMPLETED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
            }
  -         if (hasListeners)
  +   }
  +
  +   /**
  +    * Notifies all registered listeners of a transaction registration event.
  +    *
  +    * @param transaction the transaction that has just completed
  +    */
  +   public void notifyTransactionRegistered(Transaction transaction, InvocationContext ctx)
            {
  -            // this happens in realtime in the same thread
  -//            getExecutor().execute(new Runnable()
  -//            {
  -//               public void run()
  -//               {
  -            for (CacheListener listener : listeners)
  +      Transaction tx = ctx.getTransaction();
  +      boolean isOriginLocal = ctx.isOriginLocal();
  +      List<ListenerInvocation> listeners = listenerInvocations.get(TransactionRegistered.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         InvocationContext backup = resetInvocationContext(ctx);
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setOriginLocal(isOriginLocal);
  +         e.setTransaction(tx);
  +         e.setType(TRANSACTION_REGISTERED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
  +         restoreInvocationContext(backup);
  +      }
  +   }
  +
  +   public void notifyCacheBlocked(CacheSPI cache, boolean pre)
               {
  -               listener.viewChange(new_view);
  +      List<ListenerInvocation> listeners = listenerInvocations.get(CacheBlocked.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setPre(pre);
  +         e.setType(CACHE_BLOCKED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
               }
  -//               }
  -//            });
            }
  -//         restoreInvocationContext(backup);
  +
  +   public void notifyCacheUnblocked(CacheSPI cache, boolean pre)
  +   {
  +      List<ListenerInvocation> listeners = listenerInvocations.get(CacheUnblocked.class);
  +
  +      if (listeners != null && listeners.size() > 0)
  +      {
  +         EventImpl e = new EventImpl();
  +         e.setCache(cache);
  +         e.setPre(pre);
  +         e.setType(CACHE_UNBLOCKED);
  +         for (ListenerInvocation listener : listeners) listener.invoke(e);
         }
      }
   
  +
      private Map<K, V> copy(Map<K, V> data)
      {
         if (safe(data)) return data;
  @@ -467,77 +631,32 @@
      }
   
      /**
  -    * Fires off all notifications for a given queue.
  -    *
  -    * @param ctx
  -    * @param queue queue to process.
  +    * Class that encapsulates a valid invocation for a given registered listener - containing a reference to the
  +    * method to be invoked as well as the target object.
       */
  -   public void invokeQueuedNotifications(InvocationContext ctx, final List<MethodCall> queue)
  +   class ListenerInvocation
      {
  -//      InvocationContext backup = resetInvocationContext(ctx);
  +      private Object target;
  +      private Method method;
   
  -      for (MethodCall c : queue)
  -      {
  -         if (log.isTraceEnabled()) log.trace("Invoking queued notification " + c);
  -         if (evictionPolicyListener != null)
  -         {
  -            try
  -            {
  -               c.invoke(evictionPolicyListener);
  -            }
  -            catch (Throwable throwable)
  +      public ListenerInvocation(Object target, Method method)
               {
  -               log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable);
  -            }
  -         }
  +         this.target = target;
  +         this.method = method;
         }
  -      if (hasListeners)
  -      {
  -         getExecutor().execute(new Runnable()
  -         {
  -            public void run()
  -            {
   
  -               for (CacheListener listener : listeners)
  -               {
  -                  for (MethodCall c : queue)
  +      public void invoke(Event e)
                     {
                        try
                        {
  -                        c.invoke(listener);
  +            method.invoke(target, e);
                        }
  -                     catch (Throwable throwable)
  +         catch (Exception ite)
                        {
  -                        log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable);
  -                     }
  +            log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", ite);
  +            removeCacheListener(this.target);
                     }
                  }
               }
  -         });
  -      }
  -
  -//      restoreInvocationContext(backup);
  -   }
  -
  -   /**
  -    * Fires off all notifications that have been registered within the current invocation, with sendImmediately set to false.
  -    *
  -    * @param ctx context of the invocation
  -    */
  -   public void invokeQueuedNotifications(InvocationContext ctx)
  -   {
  -      invokeQueuedNotifications(ctx, ctx.getCacheListenerEvents());
  -   }
   
  -   private Executor getExecutor()
  -   {
  -      if (executor == null)
  -      {
  -         synchronized (this)
  -         {
  -            if (executor == null) executor = Executors.newFixedThreadPool(numExecutors);
  -         }
  -      }
  -      return executor;
  -   }
   }
  
  
  
  1.1      date: 2007/06/28 16:53:36;  author: msurtani;  state: Exp;JBossCache/src/org/jboss/cache/notifications/IncorrectCacheListenerException.java
  
  Index: IncorrectCacheListenerException.java
  ===================================================================
  package org.jboss.cache.notifications;
  
  import org.jboss.cache.CacheException;
  
  /**
   * Thrown when an incorrectly annotated class is added as a cache listener using the {@link org.jboss.cache.Cache#addCacheListener(Object)} API.
   *
   * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
   * @since 2.0.0
   */
  public class IncorrectCacheListenerException extends CacheException
  {
     public IncorrectCacheListenerException(String s)
     {
        super(s);
     }
  }
  
  
  



More information about the jboss-cvs-commits mailing list