[jbosscache-commits] JBoss Cache SVN: r5013 - in core/trunk/src: main/java/org/jboss/cache/notifications and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Sun Jan 6 16:26:35 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-01-06 16:26:34 -0500 (Sun, 06 Jan 2008)
New Revision: 5013

Added:
   core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java
   core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java
   core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java
   core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java
   core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java
Log:
JBCACHE-1257 - added notification callback for buddy group changes

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-06 21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -25,6 +25,7 @@
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
 import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.notifications.annotation.CacheListener;
 import org.jboss.cache.notifications.annotation.ViewChanged;
 import org.jboss.cache.notifications.event.ViewChangedEvent;
@@ -71,6 +72,7 @@
    private CacheSPI<?, ?> cache;
    private Configuration configuration;
    private RegionManager regionManager;
+   private Notifier notifier;
    private StateTransferManager stateTransferManager;
    private RPCManager rpcManager;
    /**
@@ -169,13 +171,14 @@
    }
 
    @Inject
-   private void injectDependencies(CacheSPI cache, Configuration configuration, RegionManager regionManager, StateTransferManager stateTransferManager, RPCManager rpcManager)
+   private void injectDependencies(CacheSPI cache, Configuration configuration, RegionManager regionManager, StateTransferManager stateTransferManager, RPCManager rpcManager, Notifier notifier)
    {
       this.cache = cache;
       this.configuration = configuration;
       this.regionManager = regionManager;
       this.stateTransferManager = stateTransferManager;
       this.rpcManager = rpcManager;
+      this.notifier = notifier;
 
       setupInternals(configuration.getBuddyReplicationConfig());
    }
@@ -374,20 +377,20 @@
       }
 
       // Update buddy list
-      boolean buddyGroupMutated = false;
+      boolean buddyGroupMutated = !obsoleteBuddies.isEmpty() && !uninitialisedBuddies.isEmpty();
+//      if (buddyGroupMutated) notifier.notifyBuddyGroupChange(buddyGroup, true);
       if (!obsoleteBuddies.isEmpty())
       {
          removeFromGroup(obsoleteBuddies);
-         buddyGroupMutated = true;
       }
       else
       {
          log.trace("No obsolete buddies found, nothing to announce.");
       }
+
       if (!uninitialisedBuddies.isEmpty())
       {
          addBuddies(newBuddies);
-         buddyGroupMutated = true;
       }
       else
       {
@@ -398,6 +401,7 @@
       {
          if (log.isInfoEnabled()) log.info("Buddy group members have changed. New buddy group: " + buddyGroup);
          configuration.getRuntimeConfig().setBuddyGroup(buddyGroup);
+         notifier.notifyBuddyGroupChange(buddyGroup, false);
       }
       else
          log.debug("Nothing has changed; new buddy list is identical to the old one.");

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java	2008-01-06 21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -13,6 +13,7 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InvocationContext;
+import org.jboss.cache.buddyreplication.BuddyGroup;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.notifications.annotation.*;
 import org.jboss.cache.notifications.event.*;
@@ -48,12 +49,12 @@
    private static final Class[] allowedMethodAnnotations =
          {
                CacheStarted.class, CacheStopped.class, CacheBlocked.class, CacheUnblocked.class, NodeCreated.class, NodeRemoved.class, NodeVisited.class, NodeModified.class, NodeMoved.class,
-               NodeActivated.class, NodePassivated.class, NodeLoaded.class, NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class, ViewChanged.class
+               NodeActivated.class, NodePassivated.class, NodeLoaded.class, NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class, ViewChanged.class, BuddyGroupChanged.class
          };
    private static final Class[] parameterTypes =
          {
                CacheStartedEvent.class, CacheStoppedEvent.class, CacheBlockedEvent.class, CacheUnblockedEvent.class, NodeCreatedEvent.class, NodeRemovedEvent.class, NodeVisitedEvent.class, NodeModifiedEvent.class, NodeMovedEvent.class,
-               NodeActivatedEvent.class, NodePassivatedEvent.class, NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class, ViewChangedEvent.class
+               NodeActivatedEvent.class, NodePassivatedEvent.class, NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class, TransactionCompletedEvent.class, ViewChangedEvent.class, BuddyGroupChangedEvent.class
          };
    final Map<Class, List<ListenerInvocation>> listenerInvocations = new ConcurrentHashMap<Class, List<ListenerInvocation>>();
 
@@ -529,6 +530,30 @@
    }
 
    /**
+    * Notifies all registered listeners of a buddy group change event.  Note that buddy group change notifications are ALWAYS sent
+    * immediately.
+    *
+    * @param buddyGroup buddy group to set
+    * @param pre        if true, this has occured before the buddy group message is broadcast to the cluster
+    */
+   public void notifyBuddyGroupChange(final BuddyGroup buddyGroup, boolean pre)
+   {
+      List<ListenerInvocation> listeners = listenerInvocations.get(BuddyGroupChanged.class);
+
+      if (listeners != null && !listeners.isEmpty())
+      {
+//         InvocationContext backup = resetInvocationContext(ctx);
+         EventImpl e = new EventImpl();
+         e.setCache(cache);
+         e.setBuddyGroup(buddyGroup);
+         e.setPre(pre);
+         e.setType(BUDDY_GROUP_CHANGED);
+         for (ListenerInvocation listener : listeners) listener.invoke(e);
+//         restoreInvocationContext(backup);
+      }
+   }
+
+   /**
     * Notifies all registered listeners of a transaction completion event.
     *
     * @param transaction the transaction that has just completed

Added: core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -0,0 +1,28 @@
+package org.jboss.cache.notifications.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation should be used on methods that need to be notified when a buddy group changes.
+ * <p/>
+ * Methods annotated with this annotation should be public and take in a single parameter, a {@link org.jboss.cache.notifications.event.BuddyGroupChangedEvent}
+ * otherwise an {@link org.jboss.cache.notifications.IncorrectCacheListenerException} will be thrown when registering
+ * your cache listener.
+ * <p/>
+ * This call back only occurs when a buddy group structure is changed.  In a cache setup where buddy replication is not
+ * enabled, this call back would never occur.
+ * <p/>
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @see CacheListener
+ * @see org.jboss.cache.notifications.event.CacheBlockedEvent
+ * @since 2.1.0
+ */
+ at Retention(RetentionPolicy.RUNTIME)
+ at Target(ElementType.METHOD)
+public @interface BuddyGroupChanged
+{
+}

Copied: core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java (from rev 4932, core/trunk/src/main/java/org/jboss/cache/notifications/event/CacheBlockedEvent.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -0,0 +1,18 @@
+package org.jboss.cache.notifications.event;
+
+import org.jboss.cache.buddyreplication.BuddyGroup;
+
+/**
+ * This event is passed in to any method annotated with {@link org.jboss.cache.notifications.annotation.BuddyGroupChanged}.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @since 2.1.0
+ */
+public interface BuddyGroupChangedEvent extends Event
+{
+   /**
+    * @return the new buddy group
+    * @since 2.1.0
+    */
+   BuddyGroup getBuddyGroup();
+}
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java	2008-01-06 21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -14,7 +14,7 @@
    {
       CACHE_STARTED, CACHE_STOPPED, CACHE_BLOCKED, CACHE_UNBLOCKED, NODE_ACTIVATED, NODE_PASSIVATED,
       NODE_LOADED, NODE_EVICTED, NODE_CREATED, NODE_REMOVED, NODE_MODIFIED, NODE_MOVED, NODE_VISITED,
-      TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, VIEW_CHANGED
+      TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, VIEW_CHANGED, BUDDY_GROUP_CHANGED
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java	2008-01-06 21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -2,6 +2,7 @@
 
 import org.jboss.cache.Cache;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.buddyreplication.BuddyGroup;
 import org.jgroups.View;
 
 import javax.transaction.Transaction;
@@ -16,7 +17,7 @@
 public class EventImpl implements CacheBlockedEvent, CacheUnblockedEvent, CacheStartedEvent, CacheStoppedEvent,
       NodeActivatedEvent, NodeCreatedEvent, NodeEvictedEvent, NodeLoadedEvent, NodeModifiedEvent, NodeMovedEvent,
       NodePassivatedEvent, NodeRemovedEvent, NodeVisitedEvent, TransactionCompletedEvent, TransactionRegisteredEvent,
-      ViewChangedEvent
+      ViewChangedEvent, BuddyGroupChangedEvent
 {
    private boolean pre = false; // by default events are after the fact
    private Cache cache;
@@ -29,6 +30,7 @@
    private boolean successful;
    private View newView;
    private Type type;
+   private BuddyGroup buddyGroup;
 
 
    public EventImpl(boolean pre, Cache cache, ModificationType modificationType, Map data, Fqn fqn, Transaction transaction, boolean originLocal, Fqn targetFqn, boolean successful, View newView, Type type)
@@ -162,6 +164,10 @@
       this.type = type;
    }
 
+   public void setBuddyGroup(BuddyGroup buddyGroup)
+   {
+      this.buddyGroup = buddyGroup;
+   }
 
    public boolean equals(Object o)
    {
@@ -180,6 +186,7 @@
       if (targetFqn != null ? !targetFqn.equals(event.targetFqn) : event.targetFqn != null) return false;
       if (transaction != null ? !transaction.equals(event.transaction) : event.transaction != null) return false;
       if (newView != null ? !newView.equals(event.newView) : event.newView != null) return false;
+      if (buddyGroup != null ? !buddyGroup.equals(event.buddyGroup) : event.buddyGroup != null) return false;
       if (type != null ? !type.equals(event.type) : event.type != null) return false;
 
       return true;
@@ -198,6 +205,7 @@
       result = 31 * result + (targetFqn != null ? targetFqn.hashCode() : 0);
       result = 31 * result + (successful ? 1 : 0);
       result = 31 * result + (newView != null ? newView.hashCode() : 0);
+      result = 31 * result + (buddyGroup != null ? buddyGroup.hashCode() : 0);
       result = 31 * result + (type != null ? type.hashCode() : 0);
       return result;
    }
@@ -217,7 +225,12 @@
             ", targetFqn=" + targetFqn +
             ", successful=" + successful +
             ", newView=" + newView +
+            ", buddyGroup=" + buddyGroup +
             '}';
    }
 
+   public BuddyGroup getBuddyGroup()
+   {
+      return buddyGroup;
+   }
 }

Added: core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java	2008-01-06 21:26:34 UTC (rev 5013)
@@ -0,0 +1,134 @@
+package org.jboss.cache.notifications;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.cache.buddyreplication.BuddyReplicationTestsBase;
+import org.jboss.cache.config.BuddyReplicationConfig;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.notifications.annotation.BuddyGroupChanged;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.ViewChanged;
+import org.jboss.cache.notifications.event.BuddyGroupChangedEvent;
+import org.jboss.cache.notifications.event.ViewChangedEvent;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at Test(groups = "functional")
+public class BuddyGroupChangeNotificationTest extends BuddyReplicationTestsBase
+{
+   Cache c1, c2, c3;
+   Listener listener;
+   static CountDownLatch latch1 = new CountDownLatch(1);
+   static CountDownLatch latch2 = new CountDownLatch(1);
+   static boolean stage2 = false;
+   static boolean notificationsReceived = true;
+
+   @BeforeMethod
+   public void setUp() throws CloneNotSupportedException
+   {
+      CacheFactory cf = new DefaultCacheFactory();
+      c1 = cf.createCache(false);
+
+      c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      BuddyReplicationConfig brc = new BuddyReplicationConfig();
+      brc.setEnabled(true);
+      c1.getConfiguration().setBuddyReplicationConfig(brc);
+
+      c2 = cf.createCache(c1.getConfiguration().clone(), false);
+      c3 = cf.createCache(c1.getConfiguration().clone(), false);
+
+      c1.start();
+      c2.start();
+      c3.start();
+
+      // make sure views are received and groups are formed first
+      TestingUtil.blockUntilViewsReceived(60000, c1, c2, c3);
+
+      Cache[] caches = new Cache[]{c1, c2, c3};
+
+      listener = new Listener(caches);
+
+      c2.addCacheListener(listener);
+   }
+
+   @AfterMethod
+   public void tearDown()
+   {
+      TestingUtil.killCaches(c1, c2, c3);
+   }
+
+   @Test(timeOut = 60000)
+   public void testChangingGroups() throws InterruptedException
+   {
+      // initial state
+      assertIsBuddy(c1, c2, true);
+      assertIsBuddy(c2, c3, true);
+      assertIsBuddy(c3, c1, true);
+
+      // kill c3
+      c3.stop();
+      latch1.await();
+
+      assertIsBuddy(c1, c2, true);
+      assertIsBuddy(c2, c1, true);
+
+      stage2 = true;
+      c3.start();
+      latch2.await();
+
+      assertIsBuddy(c1, c2, true);
+      assertIsBuddy(c2, c3, true);
+      assertIsBuddy(c3, c1, true);
+
+      assert notificationsReceived;
+   }
+
+   @CacheListener
+   public static class Listener
+   {
+      Cache[] caches;
+      int numActiveCaches;
+
+      public Listener(Cache[] caches)
+      {
+         this.caches = caches;
+      }
+
+      @ViewChanged
+      public void viewChanged(ViewChangedEvent e)
+      {
+         numActiveCaches = e.getNewView().getMembers().size();
+      }
+
+      @BuddyGroupChanged
+      public void buddyChanged(BuddyGroupChangedEvent e)
+      {
+         System.out.println("Received event " + e);
+         if (!e.isPre())
+         {
+            BuddyGroup bg = e.getBuddyGroup();
+
+            boolean passed = bg.getDataOwner().equals(caches[1].getLocalAddress()) &&
+                  bg.getBuddies().size() == 1 &&
+                  bg.getBuddies().contains(caches[(numActiveCaches == 3) ? 2 : 0].getLocalAddress());
+
+            notificationsReceived = notificationsReceived && passed;
+
+            if (stage2)
+               latch2.countDown();
+            else
+               latch1.countDown();
+         }
+      }
+   }
+}




More information about the jbosscache-commits mailing list