[infinispan-commits] Infinispan SVN: r2338 - in trunk/core/src/main/java/org/infinispan: notifications/cachemanagerlistener/annotation and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Sep 7 10:06:49 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-09-07 10:06:48 -0400 (Tue, 07 Sep 2010)
New Revision: 2338

Added:
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
Modified:
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
[ISPN-609] (Report Merge events via the listener API)

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java	2010-09-07 13:21:58 UTC (rev 2337)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -19,9 +19,11 @@
     * Notifies all registered listeners of a viewChange event.  Note that viewChange notifications are ALWAYS sent
     * immediately.
     */
-   void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean b);
+   void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean requiresJoin);
 
    void notifyCacheStarted(String cacheName);
 
    void notifyCacheStopped(String cacheName);
+
+   void notifyMerge(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean requiresRejoin, List<List<Address>> subgroupsMerged);
 }

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-09-07 13:21:58 UTC (rev 2337)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -43,6 +43,7 @@
    final List<ListenerInvocation> cacheStartedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
    final List<ListenerInvocation> cacheStoppedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
    final List<ListenerInvocation> viewChangedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
+   final List<ListenerInvocation> mergeListeners = new CopyOnWriteArrayList<ListenerInvocation>();
 
    private EmbeddedCacheManager cacheManager;
 
@@ -71,6 +72,21 @@
       }
    }
 
+   public void notifyMerge(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean needsToRejoin, List<List<Address>> subgroupsMerged) {
+      if (!viewChangedListeners.isEmpty()) {
+         EventImpl e = new EventImpl();
+         e.setLocalAddress(myAddress);
+         e.setViewId(viewId);
+         e.setNewMembers(members);
+         e.setOldMembers(oldMembers);
+         e.setCacheManager(cacheManager);
+         e.setNeedsToRejoin(needsToRejoin);
+         e.setSubgroupsMerged(subgroupsMerged);
+         e.setType(Event.Type.MERGED);
+         for (ListenerInvocation listener : mergeListeners) listener.invoke(e);
+      }
+   }
+
    public void notifyCacheStarted(String cacheName) {
       if (!cacheStartedListeners.isEmpty()) {
          EventImpl e = new EventImpl();

Copied: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java (from rev 2337, branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/annotation/Merged.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.notifications.cachemanagerlistener.annotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * This annotation should be used on methods that need to be notified when the cache is used in a cluster and the
+ * cluster topology experiences a merge event after a cluster split.
+ * <p/>
+ * Methods annotated with this annotation should accept a single parameter, a {@link
+ * org.infinispan.notifications.cachemanagerlistener.event.MergedEvent} otherwise a {@link
+ * org.infinispan.notifications.IncorrectListenerException} will be thrown when registering your listener.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @see org.infinispan.notifications.Listener
+ * @since 4.2
+ */
+// ensure this annotation is available at runtime.
+ at Retention(RetentionPolicy.RUNTIME)
+// ensure that this annotation is applied to classes.
+ at Target(ElementType.METHOD)
+public @interface Merged {
+}

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java	2010-09-07 13:21:58 UTC (rev 2337)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/Event.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -10,7 +10,7 @@
  */
 public interface Event {
    public enum Type {
-      CACHE_STARTED, CACHE_STOPPED, VIEW_CHANGED
+      CACHE_STARTED, CACHE_STOPPED, VIEW_CHANGED, MERGED
    }
 
    EmbeddedCacheManager getCacheManager();

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java	2010-09-07 13:21:58 UTC (rev 2337)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -2,6 +2,7 @@
 
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
 
 import java.util.List;
 
@@ -11,7 +12,7 @@
  * @author Manik Surtani
  * @since 4.0
  */
-public class EventImpl implements CacheStartedEvent, CacheStoppedEvent, ViewChangedEvent {
+public class EventImpl implements CacheStartedEvent, CacheStoppedEvent, ViewChangedEvent, MergeEvent {
 
    String cacheName;
    EmbeddedCacheManager cacheManager;
@@ -20,6 +21,7 @@
    Address localAddress;
    int viewId;
    private boolean needsToRejoin;
+   private List<List<Address>> subgroupsMerged;
 
    public EventImpl() {
    }
@@ -103,6 +105,7 @@
       if (localAddress != null ? !localAddress.equals(event.localAddress) : event.localAddress != null) return false;
       if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null) return false;
       if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null) return false;
+      if (!Util.safeEquals(subgroupsMerged, event.subgroupsMerged)) return false;
       if (type != event.type) return false;
 
       return true;
@@ -117,6 +120,7 @@
       result = 31 * result + (localAddress != null ? localAddress.hashCode() : 0);
       result = 31 * result + viewId;
       result = 31 * result + (needsToRejoin ? 1 : 0);
+      result = 31 * result + (subgroupsMerged == null ? 0 : subgroupsMerged.hashCode());
       return result;
    }
 
@@ -129,6 +133,7 @@
               ", localAddress=" + localAddress +
               ", viewId=" + viewId +
               ", needsToRejoin=" + needsToRejoin +
+              ", subgroupsMerged=" + subgroupsMerged +
               '}';
    }
 
@@ -139,4 +144,13 @@
    public boolean isNeedsToRejoin() {
       return needsToRejoin;
    }
+
+   public void setSubgroupsMerged(List<List<Address>> subgroupsMerged) {
+      this.subgroupsMerged = subgroupsMerged;
+   }
+
+   public List<List<Address>> getSubgroupsMerged() {
+      return this.subgroupsMerged;
+   }
+
 }

Copied: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java (from rev 2337, branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -0,0 +1,15 @@
+package org.infinispan.notifications.cachemanagerlistener.event;
+
+import org.infinispan.remoting.transport.Address;
+
+import java.util.List;
+
+/**
+ * This event is passed in to any method annotated with {@link org.infinispan.notifications.cachemanagerlistener.annotation.Merged}.
+ *
+ * @author Manik Surtani
+ * @version 4.2
+ */
+public interface MergeEvent {
+   List<List<Address>> getSubgroupsMerged();
+}

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-09-07 13:21:58 UTC (rev 2337)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-09-07 14:06:48 UTC (rev 2338)
@@ -72,20 +72,18 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
- * An encapsulation of a JGroups transport.  JGroups transports can be configured using a variety of methods, usually
- * by passing in one of the following properties:
- * <ul>
- * <li><tt>configurationString</tt> - a JGroups configuration String</li>
- * <li><tt>configurationXml</tt> - JGroups configuration XML as a String</li>
- * <li><tt>configurationFile</tt> - String pointing to a JGroups XML configuration file</li>
- * <li><tt>channelLookup</tt> - Fully qualified class name of a {@link org.infinispan.remoting.transport.jgroups.JGroupsChannelLookup} instance</li>
- * </ul>
- * These are normally passed in as Properties in {@link org.infinispan.config.GlobalConfiguration#setTransportProperties(java.util.Properties)}
+ * An encapsulation of a JGroups transport.  JGroups transports can be configured using a variety of methods, usually by
+ * passing in one of the following properties: <ul> <li><tt>configurationString</tt> - a JGroups configuration
+ * String</li> <li><tt>configurationXml</tt> - JGroups configuration XML as a String</li> <li><tt>configurationFile</tt>
+ * - String pointing to a JGroups XML configuration file</li> <li><tt>channelLookup</tt> - Fully qualified class name of
+ * a {@link org.infinispan.remoting.transport.jgroups.JGroupsChannelLookup} instance</li> </ul> These are normally
+ * passed in as Properties in {@link org.infinispan.config.GlobalConfiguration#setTransportProperties(java.util.Properties)}
  * or in the Infinispan XML configuration file.
- * 
+ *
  * @author Manik Surtani
  * @author Galder Zamarreño
  * @since 4.0
@@ -156,7 +154,7 @@
 
       // ensure that the channel has FLUSH enabled.
       // see ISPN-83 for details.
-      if ( channel.getProtocolStack()!= null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
+      if (channel.getProtocolStack() != null && channel.getProtocolStack().findProtocol(FLUSH.class) == null)
          throw new ConfigurationException("Flush should be enabled. This is related to https://jira.jboss.org/jira/browse/ISPN-83");
    }
 
@@ -169,7 +167,8 @@
          }
       }
       address = new JGroupsAddress(channel.getAddress());
-      if (log.isInfoEnabled()) log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
+      if (log.isInfoEnabled())
+         log.info("Cache local address is {0}, physical addresses are {1}", getAddress(), getPhysicalAddresses());
    }
 
    public int getViewId() {
@@ -220,7 +219,7 @@
    private void initChannelAndRPCDispatcher() throws CacheException {
       initChannel();
       dispatcher = new CommandAwareRpcDispatcher(channel, this,
-                                                 asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
+              asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
       MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
       dispatcher.setRequestMarshaller(adapter);
       dispatcher.setResponseMarshaller(adapter);
@@ -379,7 +378,7 @@
 
    public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout,
                                         boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay)
-         throws Exception {
+           throws Exception {
 
       if (recipients != null && recipients.isEmpty()) {
          // don't send if dest list is empty
@@ -399,8 +398,8 @@
       if (!usePriorityQueue && ResponseMode.SYNCHRONOUS == mode) usePriorityQueue = true;
       try {
          RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
-                                                        timeout, recipients != null, usePriorityQueue,
-                                                        toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
+                 timeout, recipients != null, usePriorityQueue,
+                 toJGroupsFilter(responseFilter), supportReplay, asyncMarshalling, recipients == null || recipients.size() == members.size());
 
          if (mode.isAsynchronous()) return Collections.emptyList();// async case
 
@@ -475,15 +474,49 @@
    // Implementations of JGroups interfaces
    // ------------------------------------------------------------------------------------------------------------------
 
+   private interface Notify {
+      void emitNotification(List<Address> oldMembers, View newView);
+   }
+
+   private class NotifyViewChange implements Notify {
+      @Override
+      public void emitNotification(List<Address> oldMembers, View newView) {
+         notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
+      }
+   }
+
+   private class NotifyMerge implements Notify {
+
+      @Override
+      public void emitNotification(List<Address> oldMembers, View newView) {
+         MergeView mv = (MergeView) newView;
+         notifier.notifyMerge(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView), getSubgroups(mv.getSubgroups()));
+      }
+
+      private List<List<Address>> getSubgroups(Vector<View> subviews) {
+         List<List<Address>> l = new ArrayList<List<Address>>(subviews.size());
+         for (View v: subviews) l.add(fromJGroupsAddressList(v.getMembers()));
+         return l;         
+      }
+   }
+
    public void viewAccepted(View newView) {
       Vector<org.jgroups.Address> newMembers = newView.getMembers();
       List<Address> oldMembers = null;
-      if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
+      Notify n = null;
+      if (newView instanceof MergeView) {
+         if (log.isInfoEnabled()) log.info("Received new, MERGED cluster view: {0}", newView);
+         if (notifier != null) n = new NotifyMerge();
+      } else {
+         if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
+         if (notifier != null) n = new NotifyViewChange();
+      }
+
       synchronized (membersListLock) {
          boolean needNotification = false;
          if (newMembers != null) {
             oldMembers = members;
-            // we need a defensive copy anyway            
+            // we need a defensive copy anyway
             members = fromJGroupsAddressList(newMembers);
             needNotification = true;
          }
@@ -491,13 +524,14 @@
          coordinator = (members != null && !members.isEmpty() && members.get(0).equals(getAddress()));
 
          // now notify listeners - *after* updating the coordinator. - JBCACHE-662
-         if (needNotification && notifier != null) {
-            notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
+         if (needNotification && n != null) {
+            n.notify();
          }
 
          // Wake up any threads that are waiting to know about who the coordinator is
          membersListLock.notifyAll();
       }
+
    }
 
    private boolean needsToRejoin(View v) {
@@ -505,7 +539,7 @@
          MergeView mv = (MergeView) v;
          org.jgroups.Address coord = v.getMembers().get(0);
          View winningPartition = null;
-         for (View p: mv.getSubgroups()) {
+         for (View p : mv.getSubgroups()) {
             if (p.getMembers().get(0).equals(coord)) {
                winningPartition = p;
                break;
@@ -622,6 +656,7 @@
    }
 
    // mainly for unit testing
+
    public CommandAwareRpcDispatcher getCommandAwareRpcDispatcher() {
       return dispatcher;
    }



More information about the infinispan-commits mailing list