[jboss-cvs] JBossCache/src/org/jboss/cache/notifications ...

Manik Surtani msurtani at jboss.com
Wed Jan 3 10:33:08 EST 2007


  User: msurtani
  Date: 07/01/03 10:33:08

  Modified:    src/org/jboss/cache/notifications  Notifier.java
  Log:
  Improved notification mechanism, added a notification interceptor
  
  Revision  Changes    Path
  1.14      +322 -114  JBossCache/src/org/jboss/cache/notifications/Notifier.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: Notifier.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossCache/src/org/jboss/cache/notifications/Notifier.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -b -r1.13 -r1.14
  --- Notifier.java	30 Dec 2006 19:48:45 -0000	1.13
  +++ Notifier.java	3 Jan 2007 15:33:08 -0000	1.14
  @@ -13,10 +13,13 @@
   import org.jboss.cache.CacheSPI;
   import org.jboss.cache.Fqn;
   import org.jboss.cache.InvocationContext;
  +import org.jboss.cache.marshall.MethodCall;
   import org.jgroups.View;
   
  +import java.lang.reflect.Method;
   import java.util.Collections;
   import java.util.HashMap;
  +import java.util.List;
   import java.util.Map;
   import java.util.Set;
   import java.util.concurrent.CopyOnWriteArraySet;
  @@ -43,7 +46,34 @@
      private Set<CacheListener> listeners = new CopyOnWriteArraySet();
      private CacheImpl cache;
      private InvocationContext tempCtx;
  -   private Log log = LogFactory.getLog(Notifier.class);
  +   private static final Log log = LogFactory.getLog(Notifier.class);
  +
  +   // --- the java.lang.reflect.Methods of CacheListener
  +   private static Method nodeCreated, nodeModified, nodeRemoved, nodeVisited, nodeEvicted, nodeLoaded, nodePassivated, nodeActivated, nodeMoved, cacheStarted, cacheStopped, viewChange;
  +
  +   static
  +   {
  +      try
  +      {
  +         nodeCreated = CacheListener.class.getMethod("nodeCreated", Fqn.class, boolean.class, boolean.class);
  +         nodeModified = CacheListener.class.getMethod("nodeModified", Fqn.class, boolean.class, boolean.class, Map.class);
  +         nodeRemoved = CacheListener.class.getMethod("nodeRemoved", Fqn.class, boolean.class, boolean.class, Map.class);
  +         nodeVisited = CacheListener.class.getMethod("nodeVisited", Fqn.class, boolean.class);
  +         nodeEvicted = CacheListener.class.getMethod("nodeEvicted", Fqn.class, boolean.class, boolean.class);
  +         nodeLoaded = CacheListener.class.getMethod("nodeLoaded", Fqn.class, boolean.class, Map.class);
  +         nodePassivated = CacheListener.class.getMethod("nodePassivated", Fqn.class, boolean.class);
  +         nodeActivated = CacheListener.class.getMethod("nodeActivated", Fqn.class, boolean.class);
  +         nodeMoved = CacheListener.class.getMethod("nodeMoved", Fqn.class, Fqn.class, boolean.class, boolean.class);
  +         cacheStarted = CacheListener.class.getMethod("cacheStarted", CacheSPI.class);
  +         cacheStopped = CacheListener.class.getMethod("cacheStopped", CacheSPI.class);
  +         viewChange = CacheListener.class.getMethod("viewChange", View.class);
  +      }
  +      catch (Exception e)
  +      {
  +         log.error("Unable to initialise Notifier - unable to get Methods on CacheListener.class", e);
  +      }
  +   }
  +
   
      public Notifier(CacheImpl cache)
      {
  @@ -112,11 +142,14 @@
       *
       * @param fqn
       * @param pre
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeCreated(Fqn fqn, boolean pre)
  +   public synchronized void notifyNodeCreated(Fqn fqn, boolean pre, boolean sendImmediately)
      {
   
         boolean originLocal = cache.getInvocationContext().isOriginLocal();
  +      if (sendImmediately)
  +      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
         {
  @@ -131,12 +164,11 @@
         }
         restoreInvocationContext();
      }
  -
  -   private Map copy(Map<Object, Object> data)
  +      else
      {
  -      if (data == null)
  -         return null;
  -      return Collections.unmodifiableMap(new HashMap<Object, Object>(data));
  +         MethodCall call = new MethodCall(nodeCreated, new Object[]{fqn, pre, originLocal});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
      }
   
      /**
  @@ -145,24 +177,34 @@
       * @param fqn
       * @param pre
       * @param data
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeModified(Fqn fqn, boolean pre, Map<Object, Object> data)
  +   public synchronized void notifyNodeModified(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
      {
         boolean originLocal = cache.getInvocationContext().isOriginLocal();
  +      Map dataCopy = copy(data);
  +      if (sendImmediately)
  +      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
         {
  -         evictionPolicyListener.nodeModified(fqn, pre, originLocal, copy(data));
  +            evictionPolicyListener.nodeModified(fqn, pre, originLocal, dataCopy);
         }
         if (hasListeners)
         {
            for (CacheListener listener : listeners)
            {
  -            listener.nodeModified(fqn, pre, originLocal, copy(data));
  +               listener.nodeModified(fqn, pre, originLocal, dataCopy);
            }
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeModified, new Object[]{fqn, pre, originLocal, dataCopy});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a nodeRemoved event.
  @@ -170,32 +212,46 @@
       * @param fqn
       * @param pre
       * @param data
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map<Object, Object> data)
  +   public synchronized void notifyNodeRemoved(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
      {
         boolean originLocal = cache.getInvocationContext().isOriginLocal();
  +      Map dataCopy = copy(data);
  +      if (sendImmediately)
  +      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
         {
  -         evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, copy(data));
  +            evictionPolicyListener.nodeRemoved(fqn, pre, originLocal, dataCopy);
         }
         if (hasListeners)
         {
            for (CacheListener listener : listeners)
            {
  -            listener.nodeRemoved(fqn, pre, originLocal, copy(data));
  +               listener.nodeRemoved(fqn, pre, originLocal, dataCopy);
            }
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeRemoved, new Object[]{fqn, pre, originLocal, dataCopy});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +
  +   }
   
      /**
       * Notifies all registered listeners of a nodeVisited event.
       *
       * @param fqn
       * @param pre
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeVisited(Fqn fqn, boolean pre)
  +   public synchronized void notifyNodeVisited(Fqn fqn, boolean pre, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
  @@ -211,16 +267,53 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeVisited, new Object[]{fqn, pre});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +
  +   }
  +
  +   public synchronized void notifyNodeMoved(Fqn originalFqn, Fqn newFqn, boolean pre, boolean sendImmediately)
  +   {
  +      boolean originLocal = cache.getInvocationContext().isOriginLocal();
  +      if (sendImmediately)
  +      {
  +         resetInvocationContext();
  +         if (evictionPolicyListener != null)
  +         {
  +            evictionPolicyListener.nodeMoved(originalFqn, newFqn, pre, originLocal);
  +         }
  +         if (hasListeners)
  +         {
  +            for (CacheListener listener : listeners)
  +            {
  +               listener.nodeMoved(originalFqn, newFqn, pre, originLocal);
  +            }
  +         }
  +         restoreInvocationContext();
  +      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeMoved, new Object[]{originalFqn, newFqn, pre, originLocal});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
  +
   
      /**
       * Notifies all registered listeners of a nodeEvicted event.
       *
       * @param fqn
       * @param pre
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre)
  +   public synchronized void notifyNodeEvicted(Fqn fqn, boolean pre, boolean sendImmediately)
      {
         boolean originLocal = cache.getInvocationContext().isOriginLocal();
  +      if (sendImmediately)
  +      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
         {
  @@ -235,6 +328,12 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeEvicted, new Object[]{fqn, pre, originLocal});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a nodeLoaded event.
  @@ -242,31 +341,44 @@
       * @param fqn
       * @param pre
       * @param data
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map<Object, Object> data)
  +   public synchronized void notifyNodeLoaded(Fqn fqn, boolean pre, Map<Object, Object> data, boolean sendImmediately)
  +   {
  +      Map dataCopy = copy(data);
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
         {
  -         evictionPolicyListener.nodeLoaded(fqn, pre, data);
  +            evictionPolicyListener.nodeLoaded(fqn, pre, dataCopy);
         }
         if (hasListeners)
         {
            for (CacheListener listener : listeners)
            {
  -            listener.nodeLoaded(fqn, pre, data);
  +               listener.nodeLoaded(fqn, pre, dataCopy);
            }
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeLoaded, new Object[]{fqn, pre, dataCopy});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a nodeActivated event.
       *
       * @param fqn
       * @param pre
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodeActivated(Fqn fqn, boolean pre)
  +   public synchronized void notifyNodeActivated(Fqn fqn, boolean pre, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
  @@ -282,14 +394,23 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodeActivated, new Object[]{fqn, pre});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a nodePassivated event.
       *
       * @param fqn
       * @param pre
  +    * @param sendImmediately
       */
  -   public synchronized void notifyNodePassivated(Fqn fqn, boolean pre)
  +   public synchronized void notifyNodePassivated(Fqn fqn, boolean pre, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
  @@ -305,13 +426,22 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(nodePassivated, new Object[]{fqn, pre});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a cacheStarted event.
       *
       * @param cache
  +    * @param sendImmediately
       */
  -   public synchronized void notifyCacheStarted(CacheSPI cache)
  +   public synchronized void notifyCacheStarted(CacheSPI cache, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
  @@ -327,13 +457,22 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(cacheStarted, new Object[]{cache});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
   
      /**
       * Notifies all registered listeners of a cacheStopped event.
       *
       * @param cache
  +    * @param sendImmediately
       */
  -   public synchronized void notifyCacheStopped(CacheSPI cache)
  +   public synchronized void notifyCacheStopped(CacheSPI cache, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
      {
         resetInvocationContext();
         if (evictionPolicyListener != null)
  @@ -349,6 +488,50 @@
         }
         restoreInvocationContext();
      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(cacheStopped, new Object[]{cache});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
  +
  +   /**
  +    * Notifies all registered listeners of a viewChange event.
  +    *
  +    * @param new_view
  +    * @param sendImmediately
  +    */
  +   public synchronized void notifyViewChange(View new_view, boolean sendImmediately)
  +   {
  +      if (sendImmediately)
  +      {
  +         resetInvocationContext();
  +         if (evictionPolicyListener != null)
  +         {
  +            evictionPolicyListener.viewChange(new_view);
  +         }
  +         if (hasListeners)
  +         {
  +            for (CacheListener listener : listeners)
  +            {
  +               listener.viewChange(new_view);
  +            }
  +         }
  +         restoreInvocationContext();
  +      }
  +      else
  +      {
  +         MethodCall call = new MethodCall(viewChange, new Object[]{new_view});
  +         cache.getInvocationContext().addCacheListenerEvent(call);
  +      }
  +   }
  +
  +   private Map copy(Map<Object, Object> data)
  +   {
  +      if (data == null) return null;
  +      if (data.isEmpty()) return Collections.emptyMap();
  +      return Collections.unmodifiableMap(new HashMap<Object, Object>(data));
  +   }
   
      private void restoreInvocationContext()
      {
  @@ -369,24 +552,49 @@
      }
   
      /**
  -    * Notifies all registered listeners of a viewChange event.
  +    * Fires off all notifications for a given queue.
       *
  -    * @param new_view
  +    * @param queue queue to process.
       */
  -   public synchronized void notifyViewChange(View new_view)
  +   public void invokeQueuedNotifications(List<MethodCall> queue)
      {
         resetInvocationContext();
  +      for (MethodCall c : queue)
  +      {
         if (evictionPolicyListener != null)
         {
  -         evictionPolicyListener.viewChange(new_view);
  +            try
  +            {
  +               c.invoke(evictionPolicyListener);
  +            }
  +            catch (Throwable throwable)
  +            {
  +               log.error("Unable to deliver queued notification " + c + " to eviction policy listener", throwable);
  +            }
         }
         if (hasListeners)
         {
            for (CacheListener listener : listeners)
            {
  -            listener.viewChange(new_view);
  +               try
  +               {
  +                  c.invoke(listener);
  +               }
  +               catch (Throwable throwable)
  +               {
  +                  log.error("Unable to deliver queued notification " + c + " to listener " + listener, throwable);
  +               }
  +            }
            }
         }
         restoreInvocationContext();
      }
  +
  +   /**
  +    * Fires off all notifications that have been registered within the current invocation, with sendImmediately set to false.
  +    */
  +   public void invokeQueuedNotifications()
  +   {
  +      invokeQueuedNotifications(cache.getInvocationContext().getCacheListenerEvents());
  +   }
   }
  
  
  



More information about the jboss-cvs-commits mailing list