[infinispan-commits] Infinispan SVN: r2337 - in branches/4.2.x/core/src/main/java/org/infinispan: notifications/cachemanagerlistener/annotation and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Sep 7 09:21:59 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-09-07 09:21:58 -0400 (Tue, 07 Sep 2010)
New Revision: 2337
Added:
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
[ISPN-609] (Report Merge events via the listener API)
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java 2010-09-07 12:37:26 UTC (rev 2336)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -19,9 +19,11 @@
* Notifies all registered listeners of a viewChange event. Note that viewChange notifications are ALWAYS sent
* immediately.
*/
- void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean b);
+ void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean requiresJoin);
void notifyCacheStarted(String cacheName);
void notifyCacheStopped(String cacheName);
+
+ void notifyMerge(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean requiresRejoin, List<List<Address>> subgroupsMerged);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-09-07 12:37:26 UTC (rev 2336)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -43,6 +43,7 @@
final List<ListenerInvocation> cacheStartedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
final List<ListenerInvocation> cacheStoppedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
final List<ListenerInvocation> viewChangedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
+ final List<ListenerInvocation> mergeListeners = new CopyOnWriteArrayList<ListenerInvocation>();
private EmbeddedCacheManager cacheManager;
@@ -71,6 +72,21 @@
}
}
+ public void notifyMerge(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean needsToRejoin, List<List<Address>> subgroupsMerged) {
+ if (!viewChangedListeners.isEmpty()) {
+ EventImpl e = new EventImpl();
+ e.setLocalAddress(myAddress);
+ e.setViewId(viewId);
+ e.setNewMembers(members);
+ e.setOldMembers(oldMembers);
+ e.setCacheManager(cacheManager);
+ e.setNeedsToRejoin(needsToRejoin);
+ e.setSubgroupsMerged(subgroupsMerged);
+ e.setType(Event.Type.MERGED);
+ for (ListenerInvocation listener : mergeListeners) listener.invoke(e);
+ }
+ }
+
public void notifyCacheStarted(String cacheName) {
if (!cacheStartedListeners.isEmpty()) {
EventImpl e = new EventImpl();
Copied: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java (from rev 2332, branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/ViewChanged.java)
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java (rev 0)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.notifications.cachemanagerlistener.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation should be used on methods that need to be notified when the cache is used in a cluster and the
+ * cluster topology experiences a merge event after a cluster split.
+ * <p/>
+ * Methods annotated with this annotation should accept a single parameter, a {@link
+ * org.infinispan.notifications.cachemanagerlistener.event.MergedEvent} otherwise a {@link
+ * org.infinispan.notifications.IncorrectListenerException} will be thrown when registering your listener.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @see org.infinispan.notifications.Listener
+ * @since 4.2
+ */
+// ensure this annotation is available at runtime.
+ at Retention(RetentionPolicy.RUNTIME)
+// ensure that this annotation is applied to classes.
+ at Target(ElementType.METHOD)
+public @interface Merged {
+}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java 2010-09-07 12:37:26 UTC (rev 2336)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -10,7 +10,7 @@
*/
public interface Event {
public enum Type {
- CACHE_STARTED, CACHE_STOPPED, VIEW_CHANGED
+ CACHE_STARTED, CACHE_STOPPED, VIEW_CHANGED, MERGED
}
EmbeddedCacheManager getCacheManager();
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java 2010-09-07 12:37:26 UTC (rev 2336)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -2,6 +2,7 @@
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
import java.util.List;
@@ -11,7 +12,7 @@
* @author Manik Surtani
* @since 4.0
*/
-public class EventImpl implements CacheStartedEvent, CacheStoppedEvent, ViewChangedEvent {
+public class EventImpl implements CacheStartedEvent, CacheStoppedEvent, ViewChangedEvent, MergeEvent {
String cacheName;
EmbeddedCacheManager cacheManager;
@@ -20,6 +21,7 @@
Address localAddress;
int viewId;
private boolean needsToRejoin;
+ private List<List<Address>> subgroupsMerged;
public EventImpl() {
}
@@ -103,6 +105,7 @@
if (localAddress != null ? !localAddress.equals(event.localAddress) : event.localAddress != null) return false;
if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null) return false;
if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null) return false;
+ if (!Util.safeEquals(subgroupsMerged, event.subgroupsMerged)) return false;
if (type != event.type) return false;
return true;
@@ -117,6 +120,7 @@
result = 31 * result + (localAddress != null ? localAddress.hashCode() : 0);
result = 31 * result + viewId;
result = 31 * result + (needsToRejoin ? 1 : 0);
+ result = 31 * result + (subgroupsMerged == null ? 0 : subgroupsMerged.hashCode());
return result;
}
@@ -129,6 +133,7 @@
", localAddress=" + localAddress +
", viewId=" + viewId +
", needsToRejoin=" + needsToRejoin +
+ ", subgroupsMerged=" + subgroupsMerged +
'}';
}
@@ -139,4 +144,13 @@
public boolean isNeedsToRejoin() {
return needsToRejoin;
}
+
+ public void setSubgroupsMerged(List<List<Address>> subgroupsMerged) {
+ this.subgroupsMerged = subgroupsMerged;
+ }
+
+ public List<List<Address>> getSubgroupsMerged() {
+ return this.subgroupsMerged;
+ }
+
}
Added: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java (rev 0)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -0,0 +1,15 @@
+package org.infinispan.notifications.cachemanagerlistener.event;
+
+import org.infinispan.remoting.transport.Address;
+
+import java.util.List;
+
+/**
+ * This event is passed in to any method annotated with {@link org.infinispan.notifications.cachemanagerlistener.annotation.Merged}.
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public interface MergeEvent {
+ List<List<Address>> getSubgroupsMerged();
+}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-09-07 12:37:26 UTC (rev 2336)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-09-07 13:21:58 UTC (rev 2337)
@@ -72,20 +72,18 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
- * An encapsulation of a JGroups transport. JGroups transports can be configured using a variety of methods, usually
- * by passing in one of the following properties:
- * <ul>
- * <li><tt>configurationString</tt> - a JGroups configuration String</li>
- * <li><tt>configurationXml</tt> - JGroups configuration XML as a String</li>
- * <li><tt>configurationFile</tt> - String pointing to a JGroups XML configuration file</li>
- * <li><tt>channelLookup</tt> - Fully qualified class name of a {@link org.infinispan.remoting.transport.jgroups.JGroupsChannelLookup} instance</li>
- * </ul>
- * These are normally passed in as Properties in {@link org.infinispan.config.GlobalConfiguration#setTransportProperties(java.util.Properties)}
+ * An encapsulation of a JGroups transport. JGroups transports can be configured using a variety of methods, usually by
+ * passing in one of the following properties: <ul> <li><tt>configurationString</tt> - a JGroups configuration
+ * String</li> <li><tt>configurationXml</tt> - JGroups configuration XML as a String</li> <li><tt>configurationFile</tt>
+ * - String pointing to a JGroups XML configuration file</li> <li><tt>channelLookup</tt> - Fully qualified class name of
+ * a {@link org.infinispan.remoting.transport.jgroups.JGroupsChannelLookup} instance</li> </ul> These are normally
+ * passed in as Properties in {@link org.infinispan.config.GlobalConfiguration#setTransportProperties(java.util.Properties)}
* or in the Infinispan XML configuration file.
- *
+ *
* @author Manik Surtani
* @author Galder Zamarreño
* @since 4.0
@@ -156,7 +154,7 @@
// ensure that the channel has FLUSH enabled.
// see ISPN-83 for details.
- if ( channel.getProtocolStack()!= null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
+ if (channel.getProtocolStack() != null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
throw new ConfigurationException("Flush should be enabled. This is related to https://jira.jboss.org/jira/browse/ISPN-83");
}
@@ -169,7 +167,8 @@
}
}
address = new JGroupsAddress(channel.getAddress());
- if (log.isInfoEnabled()) log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
+ if (log.isInfoEnabled())
+ log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
}
public int getViewId() {
@@ -220,7 +219,7 @@
private void initChannelAndRPCDispatcher() throws CacheException {
initChannel();
dispatcher = new CommandAwareRpcDispatcher(channel, this,
- asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
+ asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
dispatcher.setRequestMarshaller(adapter);
dispatcher.setResponseMarshaller(adapter);
@@ -379,7 +378,7 @@
public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout,
boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay)
- throws Exception {
+ throws Exception {
if (recipients != null && recipients.isEmpty()) {
// don't send if dest list is empty
@@ -399,8 +398,8 @@
if (!usePriorityQueue && ResponseMode.SYNCHRONOUS == mode) usePriorityQueue = true;
try {
RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
- timeout, recipients != null, usePriorityQueue,
- toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
+ timeout, recipients != null, usePriorityQueue,
+ toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
if (mode.isAsynchronous()) return Collections.emptyList();// async case
@@ -475,15 +474,49 @@
// Implementations of JGroups interfaces
// ------------------------------------------------------------------------------------------------------------------
+ private interface Notify {
+ void emitNotification(List<Address> oldMembers, View newView);
+ }
+
+ private class NotifyViewChange implements Notify {
+ @Override
+ public void emitNotification(List<Address> oldMembers, View newView) {
+ notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
+ }
+ }
+
+ private class NotifyMerge implements Notify {
+
+ @Override
+ public void emitNotification(List<Address> oldMembers, View newView) {
+ MergeView mv = (MergeView) newView;
+ notifier.notifyMerge(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView), getSubgroups(mv.getSubgroups()));
+ }
+
+ private List<List<Address>> getSubgroups(Vector<View> subviews) {
+ List<List<Address>> l = new ArrayList<List<Address>>(subviews.size());
+ for (View v: subviews) l.add(fromJGroupsAddressList(v.getMembers()));
+ return l;
+ }
+ }
+
public void viewAccepted(View newView) {
Vector<org.jgroups.Address> newMembers = newView.getMembers();
List<Address> oldMembers = null;
- if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
+ Notify n = null;
+ if (newView instanceof MergeView) {
+ if (log.isInfoEnabled()) log.info("Received new, MERGED cluster view: {0}", newView);
+ if (notifier != null) n = new NotifyMerge();
+ } else {
+ if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
+ if (notifier != null) n = new NotifyViewChange();
+ }
+
synchronized (membersListLock) {
boolean needNotification = false;
if (newMembers != null) {
oldMembers = members;
- // we need a defensive copy anyway
+ // we need a defensive copy anyway
members = fromJGroupsAddressList(newMembers);
needNotification = true;
}
@@ -491,13 +524,14 @@
coordinator = (members != null && !members.isEmpty() && members.get(0).equals(getAddress()));
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
- if (needNotification && notifier != null) {
- notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
+ if (needNotification && n != null) {
+ n.notify();
}
// Wake up any threads that are waiting to know about who the coordinator is
membersListLock.notifyAll();
}
+
}
private boolean needsToRejoin(View v) {
@@ -505,7 +539,7 @@
MergeView mv = (MergeView) v;
org.jgroups.Address coord = v.getMembers().get(0);
View winningPartition = null;
- for (View p: mv.getSubgroups()) {
+ for (View p : mv.getSubgroups()) {
if (p.getMembers().get(0).equals(coord)) {
winningPartition = p;
break;
@@ -622,6 +656,7 @@
}
// mainly for unit testing
+
public CommandAwareRpcDispatcher getCommandAwareRpcDispatcher() {
return dispatcher;
}
More information about the infinispan-commits
mailing list