[infinispan-commits] Infinispan SVN: r2496 - in branches/4.2.x/core/src: main/java/org/infinispan/distribution and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Oct 8 17:09:38 EDT 2010


Author: vblagojevic at jboss.com
Date: 2010-10-08 17:09:37 -0400 (Fri, 08 Oct 2010)
New Revision: 2496

Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java
   branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
   branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
   branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
Log:
[ISPN-609] - Report Merge events via the listener API (Part 2)
Part 1: see change set 2337 on branch 4.2 2338 on trunk

Modified: branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -2,6 +2,7 @@
 
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
@@ -21,6 +22,7 @@
    }
 
    @ViewChanged
+   @Merged
    public void handleViewChange(ViewChangedEvent vce) {
       keyAffinityService.handleViewChange(vce);
    }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -27,6 +27,7 @@
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
@@ -391,7 +392,9 @@
 
    @Listener
    public class ViewChangeListener {
+      
       @ViewChanged
+      @Merged
       public void handleViewChange(ViewChangedEvent e) {
          log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());         
          boolean started;

Modified: branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -9,6 +9,7 @@
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.Event;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
@@ -326,6 +327,7 @@
        * became the coordinator. This method will report any issues that could potentially arise from this push.
        */
       @ViewChanged
+      @Merged
       public void viewChange(ViewChangedEvent event) {
          boolean tmp = isCoordinator(event.getNewMembers(), event.getLocalAddress());
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -7,10 +7,12 @@
 import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
 import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.Event;
 import org.infinispan.notifications.cachemanagerlistener.event.EventImpl;
+import org.infinispan.notifications.cachemanagerlistener.event.MergeEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.logging.Log;
@@ -38,6 +40,7 @@
       allowedListeners.put(CacheStarted.class, CacheStartedEvent.class);
       allowedListeners.put(CacheStopped.class, CacheStoppedEvent.class);
       allowedListeners.put(ViewChanged.class, ViewChangedEvent.class);
+      allowedListeners.put(Merged.class, MergeEvent.class);
    }
 
    final List<ListenerInvocation> cacheStartedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
@@ -51,6 +54,7 @@
       listenersMap.put(CacheStarted.class, cacheStartedListeners);
       listenersMap.put(CacheStopped.class, cacheStoppedListeners);
       listenersMap.put(ViewChanged.class, viewChangedListeners);
+      listenersMap.put(Merged.class, mergeListeners);
    }
 
    @Inject

Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -10,6 +10,6 @@
  * @author Manik Surtani
  * @version 4.2
  */
-public interface MergeEvent {
+public interface MergeEvent extends ViewChangedEvent {
    List<List<Address>> getSubgroupsMerged();
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -475,14 +475,20 @@
 
    public void viewAccepted(View newView) {
       Vector<org.jgroups.Address> newMembers = newView.getMembers();
-      List<Address> oldMembers = null;
+      List<Address> oldMembers = null;      
+      boolean hasNotifier = notifier != null;
       Notify n = null;
-      if (newView instanceof MergeView) {
-         if (log.isInfoEnabled()) log.info("Received new, MERGED cluster view: {0}", newView);
-         if (notifier != null) n = new NotifyMerge();
-      } else {
-         if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
-         if (notifier != null) n = new NotifyViewChange();
+      
+      if (hasNotifier) {
+         if (newView instanceof MergeView) {
+            if (log.isInfoEnabled())
+               log.info("Received new, MERGED cluster view: {0}", newView);
+            n = new NotifyMerge();
+         } else {
+            if (log.isInfoEnabled())
+               log.info("Received new cluster view: {0}", newView);
+            n = new NotifyViewChange();
+         }
       }
 
       synchronized (membersListLock) {
@@ -497,7 +503,7 @@
          coordinator = (members != null && !members.isEmpty() && members.get(0).equals(getAddress()));
 
          // now notify listeners - *after* updating the coordinator. - JBCACHE-662
-         if (needNotification && n != null) n.emitNotification(oldMembers, newView);
+         if (needNotification && hasNotifier) n.emitNotification(oldMembers, newView);
 
          // Wake up any threads that are waiting to know about who the coordinator is
          membersListLock.notifyAll();

Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -14,6 +14,7 @@
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.notifications.Listener;
 import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
 import org.infinispan.remoting.MembershipArithmetic;
@@ -91,7 +92,9 @@
 
    @Listener
    public class StaleTransactionCleanup {
+      
       @ViewChanged
+      @Merged
       public void onViewChange(ViewChangedEvent vce) {
          final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
          if (!leavers.isEmpty()) {

Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java	2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java	2010-10-08 21:09:37 UTC (rev 2496)
@@ -1,32 +1,34 @@
 package org.infinispan.distribution.rehash;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.infinispan.Cache;
-import org.infinispan.config.Configuration;
 import org.infinispan.distribution.BaseDistFunctionalTest;
 import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
 import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.MergeEvent;
 import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.infinispan.test.MultipleCacheManagersTest;
 import org.infinispan.test.TestingUtil;
 import org.jgroups.protocols.DISCARD;
 import org.testng.annotations.Test;
 
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+ at Test(groups = "functional", testName =  "distribution.rehash.RehashAfterPartitionMergeTest", enabled = true, description = "Need to revisit after https://jira.jboss.org/browse/ISPN-493")
+public class RehashAfterPartitionMergeTest extends BaseDistFunctionalTest {
+   
+   DISCARD d1, d2;
 
- at Test(groups = "functional", testName =  "distribution.rehash.RehashAfterPartitionMergeTest", enabled = false, description = "Need to revisit after https://jira.jboss.org/browse/ISPN-493")
-public class RehashAfterPartitionMergeTest extends MultipleCacheManagersTest {
+   
+   public RehashAfterPartitionMergeTest() {
+      super();
+      this.INIT_CLUSTER_SIZE = 2;
+      this.sync = true;
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
 
-   Cache<Object, Object> c1, c2;
-   List<Cache<Object, Object>> caches;
-   DISCARD d1, d2;
-
    @Override
    protected void createCacheManagers() throws Throwable {
-      caches = createClusteredCaches(2, "test", getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC));
-
-      c1 = caches.get(0);
-      c2 = caches.get(1);
+      super.createCacheManagers();
       d1 = TestingUtil.getDiscardForCache(c1);
       d2 = TestingUtil.getDiscardForCache(c2);
    }
@@ -35,11 +37,12 @@
       c1.put("1", "value");
       c2.put("2", "value");
 
-      for (Cache<Object, Object> c: caches) {
+      for (Cache<Object, String> c: caches) {
          assert "value".equals(c.get("1"));
          assert "value".equals(c.get("2"));
          assert manager(c).getMembers().size() == 2;
       }
+      
       AtomicInteger ai = new AtomicInteger(0);
       manager(c1).addListener(new ViewChangeListener(ai));
       manager(c2).addListener(new ViewChangeListener(ai));
@@ -49,9 +52,11 @@
 
       // Wait till *both* instances have seen the view change.
       while (ai.get() < 2) TestingUtil.sleepThread(500);
-
+      
+      
+      
       // we should see a network partition
-      for (Cache<Object, Object> c: caches) assert manager(c).getMembers().size() == 1;
+      for (Cache<Object, String> c: caches) assert manager(c).getMembers().size() == 1;
 
       c1.put("3", "value");
       c2.put("4", "value");
@@ -71,11 +76,9 @@
       // wait till we see the view change
       while (ai.get() < 2) TestingUtil.sleepThread(500);
 
-      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(c1, c2);
-
       c1.put("5", "value");
       c2.put("6", "value");
-      for (Cache<Object, Object> c: caches) {
+      for (Cache<Object, String> c: caches) {
          assert "value".equals(c.get("5"));
          assert "value".equals(c.get("6"));
          assert manager(c).getMembers().size() == 2;
@@ -94,5 +97,10 @@
       public void handle(ViewChangedEvent e) {
          ai.getAndIncrement();
       }
+      
+      @Merged
+      public void merged(MergeEvent e){
+         ai.getAndIncrement();
+      }
    }
 }



More information about the infinispan-commits mailing list