Author: manik.surtani(a)jboss.com
Date: 2008-01-06 16:26:34 -0500 (Sun, 06 Jan 2008)
New Revision: 5013
Added:
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java
core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java
core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java
core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java
core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java
Log:
JBCACHE-1257 - added notification callback for buddy group changes
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-06
21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -25,6 +25,7 @@
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.ViewChanged;
import org.jboss.cache.notifications.event.ViewChangedEvent;
@@ -71,6 +72,7 @@
private CacheSPI<?, ?> cache;
private Configuration configuration;
private RegionManager regionManager;
+ private Notifier notifier;
private StateTransferManager stateTransferManager;
private RPCManager rpcManager;
/**
@@ -169,13 +171,14 @@
}
@Inject
- private void injectDependencies(CacheSPI cache, Configuration configuration,
RegionManager regionManager, StateTransferManager stateTransferManager, RPCManager
rpcManager)
+ private void injectDependencies(CacheSPI cache, Configuration configuration,
RegionManager regionManager, StateTransferManager stateTransferManager, RPCManager
rpcManager, Notifier notifier)
{
this.cache = cache;
this.configuration = configuration;
this.regionManager = regionManager;
this.stateTransferManager = stateTransferManager;
this.rpcManager = rpcManager;
+ this.notifier = notifier;
setupInternals(configuration.getBuddyReplicationConfig());
}
@@ -374,20 +377,20 @@
}
// Update buddy list
- boolean buddyGroupMutated = false;
+ boolean buddyGroupMutated = !obsoleteBuddies.isEmpty() &&
!uninitialisedBuddies.isEmpty();
+// if (buddyGroupMutated) notifier.notifyBuddyGroupChange(buddyGroup, true);
if (!obsoleteBuddies.isEmpty())
{
removeFromGroup(obsoleteBuddies);
- buddyGroupMutated = true;
}
else
{
log.trace("No obsolete buddies found, nothing to announce.");
}
+
if (!uninitialisedBuddies.isEmpty())
{
addBuddies(newBuddies);
- buddyGroupMutated = true;
}
else
{
@@ -398,6 +401,7 @@
{
if (log.isInfoEnabled()) log.info("Buddy group members have changed. New
buddy group: " + buddyGroup);
configuration.getRuntimeConfig().setBuddyGroup(buddyGroup);
+ notifier.notifyBuddyGroupChange(buddyGroup, false);
}
else
log.debug("Nothing has changed; new buddy list is identical to the old
one.");
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java 2008-01-06
21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/Notifier.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -13,6 +13,7 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.buddyreplication.BuddyGroup;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.notifications.annotation.*;
import org.jboss.cache.notifications.event.*;
@@ -48,12 +49,12 @@
private static final Class[] allowedMethodAnnotations =
{
CacheStarted.class, CacheStopped.class, CacheBlocked.class,
CacheUnblocked.class, NodeCreated.class, NodeRemoved.class, NodeVisited.class,
NodeModified.class, NodeMoved.class,
- NodeActivated.class, NodePassivated.class, NodeLoaded.class,
NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class,
ViewChanged.class
+ NodeActivated.class, NodePassivated.class, NodeLoaded.class,
NodeEvicted.class, TransactionRegistered.class, TransactionCompleted.class,
ViewChanged.class, BuddyGroupChanged.class
};
private static final Class[] parameterTypes =
{
CacheStartedEvent.class, CacheStoppedEvent.class, CacheBlockedEvent.class,
CacheUnblockedEvent.class, NodeCreatedEvent.class, NodeRemovedEvent.class,
NodeVisitedEvent.class, NodeModifiedEvent.class, NodeMovedEvent.class,
- NodeActivatedEvent.class, NodePassivatedEvent.class,
NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class,
TransactionCompletedEvent.class, ViewChangedEvent.class
+ NodeActivatedEvent.class, NodePassivatedEvent.class,
NodeLoadedEvent.class, NodeEvictedEvent.class, TransactionRegisteredEvent.class,
TransactionCompletedEvent.class, ViewChangedEvent.class, BuddyGroupChangedEvent.class
};
final Map<Class, List<ListenerInvocation>> listenerInvocations = new
ConcurrentHashMap<Class, List<ListenerInvocation>>();
@@ -529,6 +530,30 @@
}
/**
+ * Notifies all registered listeners of a buddy group change event. Note that buddy
group change notifications are ALWAYS sent
+ * immediately.
+ *
+ * @param buddyGroup buddy group to set
+ * @param pre if true, this has occured before the buddy group message is
broadcast to the cluster
+ */
+ public void notifyBuddyGroupChange(final BuddyGroup buddyGroup, boolean pre)
+ {
+ List<ListenerInvocation> listeners =
listenerInvocations.get(BuddyGroupChanged.class);
+
+ if (listeners != null && !listeners.isEmpty())
+ {
+// InvocationContext backup = resetInvocationContext(ctx);
+ EventImpl e = new EventImpl();
+ e.setCache(cache);
+ e.setBuddyGroup(buddyGroup);
+ e.setPre(pre);
+ e.setType(BUDDY_GROUP_CHANGED);
+ for (ListenerInvocation listener : listeners) listener.invoke(e);
+// restoreInvocationContext(backup);
+ }
+ }
+
+ /**
* Notifies all registered listeners of a transaction completion event.
*
* @param transaction the transaction that has just completed
Added:
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/BuddyGroupChanged.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -0,0 +1,28 @@
+package org.jboss.cache.notifications.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation should be used on methods that need to be notified when a buddy group
changes.
+ * <p/>
+ * Methods annotated with this annotation should be public and take in a single
parameter, a {@link org.jboss.cache.notifications.event.BuddyGroupChangedEvent}
+ * otherwise an {@link org.jboss.cache.notifications.IncorrectCacheListenerException}
will be thrown when registering
+ * your cache listener.
+ * <p/>
+ * This call back only occurs when a buddy group structure is changed. In a cache setup
where buddy replication is not
+ * enabled, this call back would never occur.
+ * <p/>
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @see CacheListener
+ * @see org.jboss.cache.notifications.event.CacheBlockedEvent
+ * @since 2.1.0
+ */
+(a)Retention(RetentionPolicy.RUNTIME)
+(a)Target(ElementType.METHOD)
+public @interface BuddyGroupChanged
+{
+}
Copied:
core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java
(from rev 4932,
core/trunk/src/main/java/org/jboss/cache/notifications/event/CacheBlockedEvent.java)
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/notifications/event/BuddyGroupChangedEvent.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -0,0 +1,18 @@
+package org.jboss.cache.notifications.event;
+
+import org.jboss.cache.buddyreplication.BuddyGroup;
+
+/**
+ * This event is passed in to any method annotated with {@link
org.jboss.cache.notifications.annotation.BuddyGroupChanged}.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @since 2.1.0
+ */
+public interface BuddyGroupChangedEvent extends Event
+{
+ /**
+ * @return the new buddy group
+ * @since 2.1.0
+ */
+ BuddyGroup getBuddyGroup();
+}
\ No newline at end of file
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java 2008-01-06
21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/event/Event.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -14,7 +14,7 @@
{
CACHE_STARTED, CACHE_STOPPED, CACHE_BLOCKED, CACHE_UNBLOCKED, NODE_ACTIVATED,
NODE_PASSIVATED,
NODE_LOADED, NODE_EVICTED, NODE_CREATED, NODE_REMOVED, NODE_MODIFIED, NODE_MOVED,
NODE_VISITED,
- TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, VIEW_CHANGED
+ TRANSACTION_COMPLETED, TRANSACTION_REGISTERED, VIEW_CHANGED, BUDDY_GROUP_CHANGED
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java 2008-01-06
21:17:01 UTC (rev 5012)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/event/EventImpl.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -2,6 +2,7 @@
import org.jboss.cache.Cache;
import org.jboss.cache.Fqn;
+import org.jboss.cache.buddyreplication.BuddyGroup;
import org.jgroups.View;
import javax.transaction.Transaction;
@@ -16,7 +17,7 @@
public class EventImpl implements CacheBlockedEvent, CacheUnblockedEvent,
CacheStartedEvent, CacheStoppedEvent,
NodeActivatedEvent, NodeCreatedEvent, NodeEvictedEvent, NodeLoadedEvent,
NodeModifiedEvent, NodeMovedEvent,
NodePassivatedEvent, NodeRemovedEvent, NodeVisitedEvent, TransactionCompletedEvent,
TransactionRegisteredEvent,
- ViewChangedEvent
+ ViewChangedEvent, BuddyGroupChangedEvent
{
private boolean pre = false; // by default events are after the fact
private Cache cache;
@@ -29,6 +30,7 @@
private boolean successful;
private View newView;
private Type type;
+ private BuddyGroup buddyGroup;
public EventImpl(boolean pre, Cache cache, ModificationType modificationType, Map
data, Fqn fqn, Transaction transaction, boolean originLocal, Fqn targetFqn, boolean
successful, View newView, Type type)
@@ -162,6 +164,10 @@
this.type = type;
}
+ public void setBuddyGroup(BuddyGroup buddyGroup)
+ {
+ this.buddyGroup = buddyGroup;
+ }
public boolean equals(Object o)
{
@@ -180,6 +186,7 @@
if (targetFqn != null ? !targetFqn.equals(event.targetFqn) : event.targetFqn !=
null) return false;
if (transaction != null ? !transaction.equals(event.transaction) :
event.transaction != null) return false;
if (newView != null ? !newView.equals(event.newView) : event.newView != null)
return false;
+ if (buddyGroup != null ? !buddyGroup.equals(event.buddyGroup) : event.buddyGroup !=
null) return false;
if (type != null ? !type.equals(event.type) : event.type != null) return false;
return true;
@@ -198,6 +205,7 @@
result = 31 * result + (targetFqn != null ? targetFqn.hashCode() : 0);
result = 31 * result + (successful ? 1 : 0);
result = 31 * result + (newView != null ? newView.hashCode() : 0);
+ result = 31 * result + (buddyGroup != null ? buddyGroup.hashCode() : 0);
result = 31 * result + (type != null ? type.hashCode() : 0);
return result;
}
@@ -217,7 +225,12 @@
", targetFqn=" + targetFqn +
", successful=" + successful +
", newView=" + newView +
+ ", buddyGroup=" + buddyGroup +
'}';
}
+ public BuddyGroup getBuddyGroup()
+ {
+ return buddyGroup;
+ }
}
Added:
core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/notifications/BuddyGroupChangeNotificationTest.java 2008-01-06
21:26:34 UTC (rev 5013)
@@ -0,0 +1,134 @@
+package org.jboss.cache.notifications;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheFactory;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.cache.buddyreplication.BuddyReplicationTestsBase;
+import org.jboss.cache.config.BuddyReplicationConfig;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.notifications.annotation.BuddyGroupChanged;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.ViewChanged;
+import org.jboss.cache.notifications.event.BuddyGroupChangedEvent;
+import org.jboss.cache.notifications.event.ViewChangedEvent;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+@Test(groups = "functional")
+public class BuddyGroupChangeNotificationTest extends BuddyReplicationTestsBase
+{
+ Cache c1, c2, c3;
+ Listener listener;
+ static CountDownLatch latch1 = new CountDownLatch(1);
+ static CountDownLatch latch2 = new CountDownLatch(1);
+ static boolean stage2 = false;
+ static boolean notificationsReceived = true;
+
+ @BeforeMethod
+ public void setUp() throws CloneNotSupportedException
+ {
+ CacheFactory cf = new DefaultCacheFactory();
+ c1 = cf.createCache(false);
+
+ c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ BuddyReplicationConfig brc = new BuddyReplicationConfig();
+ brc.setEnabled(true);
+ c1.getConfiguration().setBuddyReplicationConfig(brc);
+
+ c2 = cf.createCache(c1.getConfiguration().clone(), false);
+ c3 = cf.createCache(c1.getConfiguration().clone(), false);
+
+ c1.start();
+ c2.start();
+ c3.start();
+
+ // make sure views are received and groups are formed first
+ TestingUtil.blockUntilViewsReceived(60000, c1, c2, c3);
+
+ Cache[] caches = new Cache[]{c1, c2, c3};
+
+ listener = new Listener(caches);
+
+ c2.addCacheListener(listener);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ TestingUtil.killCaches(c1, c2, c3);
+ }
+
+ @Test(timeOut = 60000)
+ public void testChangingGroups() throws InterruptedException
+ {
+ // initial state
+ assertIsBuddy(c1, c2, true);
+ assertIsBuddy(c2, c3, true);
+ assertIsBuddy(c3, c1, true);
+
+ // kill c3
+ c3.stop();
+ latch1.await();
+
+ assertIsBuddy(c1, c2, true);
+ assertIsBuddy(c2, c1, true);
+
+ stage2 = true;
+ c3.start();
+ latch2.await();
+
+ assertIsBuddy(c1, c2, true);
+ assertIsBuddy(c2, c3, true);
+ assertIsBuddy(c3, c1, true);
+
+ assert notificationsReceived;
+ }
+
+ @CacheListener
+ public static class Listener
+ {
+ Cache[] caches;
+ int numActiveCaches;
+
+ public Listener(Cache[] caches)
+ {
+ this.caches = caches;
+ }
+
+ @ViewChanged
+ public void viewChanged(ViewChangedEvent e)
+ {
+ numActiveCaches = e.getNewView().getMembers().size();
+ }
+
+ @BuddyGroupChanged
+ public void buddyChanged(BuddyGroupChangedEvent e)
+ {
+ System.out.println("Received event " + e);
+ if (!e.isPre())
+ {
+ BuddyGroup bg = e.getBuddyGroup();
+
+ boolean passed = bg.getDataOwner().equals(caches[1].getLocalAddress())
&&
+ bg.getBuddies().size() == 1 &&
+ bg.getBuddies().contains(caches[(numActiveCaches == 3) ? 2 :
0].getLocalAddress());
+
+ notificationsReceived = notificationsReceived && passed;
+
+ if (stage2)
+ latch2.countDown();
+ else
+ latch1.countDown();
+ }
+ }
+ }
+}