[infinispan-commits] Infinispan SVN: r1684 - in trunk/core/src: main/java/org/infinispan/notifications/cachemanagerlistener and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Apr 12 10:34:21 EDT 2010


Author: manik.surtani at jboss.com
Date: 2010-04-12 10:34:20 -0400 (Mon, 12 Apr 2010)
New Revision: 1684

Modified:
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
   trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
   trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java
   trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
Log:
[ISPN-398] (Cluster breaks after a MergeView is issued) Tests and fix

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -13,7 +13,9 @@
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.InvocationContextContainer;
+
 import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
+
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.factories.annotations.Stop;
@@ -36,6 +38,7 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.MembershipArithmetic;
 import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.rhq.helpers.pluginAnnotations.agent.DataType;
@@ -97,7 +100,7 @@
    volatile boolean rehashInProgress = false;
    volatile Address joiner;
    static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
-         AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
+           AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
    private DataContainer dataContainer;
    private InterceptorChain interceptorChain;
    private InvocationContextContainer icc;
@@ -106,7 +109,7 @@
    volatile boolean joinComplete = false;
    final List<Address> leavers = new CopyOnWriteArrayList<Address>();
    volatile Future<Void> leaveTaskFuture;
-   final CountDownLatch startLatch = new CountDownLatch(1);
+   final ReclosableLatch startLatch = new ReclosableLatch(false);
 
    @Inject
    public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
@@ -123,20 +126,26 @@
    }
 
    // needs to be AFTER the RpcManager
+
    @Start(priority = 20)
    public void start() throws Exception {
       replCount = configuration.getNumOwners();
+      listener = new ViewChangeListener();
+      notifier.addListener(listener);
+      join();
+   }
+
+   private void join() throws Exception {
+      startLatch.close();
       consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
       self = rpcManager.getTransport().getAddress();
-      listener = new ViewChangeListener();
-      notifier.addListener(listener);
       if (rpcManager.getTransport().getMembers().size() > 1) {
          JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
          rehashExecutor.submit(joinTask);
       } else {
          joinComplete = true;
       }
-      startLatch.countDown();
+      startLatch.open();
    }
 
    @Stop(priority = 20)
@@ -214,7 +223,7 @@
       if (consistentHash == null) {
          Map<Object, List<Address>> m = new HashMap<Object, List<Address>>(keys.size());
          List<Address> selfList = Collections.singletonList(self);
-         for (Object k: keys) m.put(k, selfList);
+         for (Object k : keys) m.put(k, selfList);
          return m;
       }
       return consistentHash.locateAll(keys, replCount);
@@ -230,7 +239,7 @@
 
       ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
       List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
-                                                           configuration.getSyncReplTimeout(), false, filter);
+              configuration.getSyncReplTimeout(), false, filter);
 
       if (!responses.isEmpty()) {
          for (Response r : responses) {
@@ -268,7 +277,8 @@
          if (trace) log.trace("Allowing {0} to join", joiner);
          return new LinkedList<Address>(consistentHash.getCaches());
       } else {
-         if (trace) log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+         if (trace)
+            log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
          return null;
       }
    }
@@ -328,12 +338,21 @@
       public void handleViewChange(ViewChangedEvent e) {
          boolean started = false;
          // how long do we wait for a startup?
-         try {
-            started = startLatch.await(2, TimeUnit.MINUTES);
-            if (started) rehash(e.getNewMembers(), e.getOldMembers());
-            else log.warn("DistributionManager not started after waiting up to 2 minutes!  Not rehashing!");
-         } catch (InterruptedException ie) {
-            log.warn("View change interrupted; not rehashing!");
+         if (e.isNeedsToRejoin()) {
+            try {
+               join();
+            } catch (Exception e1) {
+               log.fatal("Unable to recover from a partition merge!", e1);
+            }
+         } else {
+
+            try {
+               started = startLatch.await(2, TimeUnit.MINUTES);
+               if (started) rehash(e.getNewMembers(), e.getOldMembers());
+               else log.warn("DistributionManager not started after waiting up to 2 minutes!  Not rehashing!");
+            } catch (InterruptedException ie) {
+               log.warn("View change interrupted; not rehashing!");
+            }
          }
       }
    }

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -19,7 +19,7 @@
     * Notifies all registered listeners of a viewChange event.  Note that viewChange notifications are ALWAYS sent
     * immediately.
     */
-   void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId);
+   void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean b);
 
    void notifyCacheStarted(String cacheName);
 

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -55,7 +55,7 @@
       this.cacheManager = cacheManager;
    }
 
-   public void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId) {
+   public void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean needsToRejoin) {
       if (!viewChangedListeners.isEmpty()) {
          EventImpl e = new EventImpl();
          e.setLocalAddress(myAddress);
@@ -63,6 +63,7 @@
          e.setNewMembers(members);
          e.setOldMembers(oldMembers);
          e.setCacheManager(cacheManager);
+         e.setNeedsToRejoin(needsToRejoin);
          e.setType(Event.Type.VIEW_CHANGED);
          for (ListenerInvocation listener : viewChangedListeners) listener.invoke(e);
       }

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -19,6 +19,7 @@
    List<Address> newMembers, oldMembers;
    Address localAddress;
    int viewId;
+   private boolean needsToRejoin;
 
    public EventImpl() {
    }
@@ -96,13 +97,12 @@
 
       EventImpl event = (EventImpl) o;
 
+      if (needsToRejoin != event.needsToRejoin) return false;
       if (viewId != event.viewId) return false;
       if (cacheName != null ? !cacheName.equals(event.cacheName) : event.cacheName != null) return false;
       if (localAddress != null ? !localAddress.equals(event.localAddress) : event.localAddress != null) return false;
-      if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null)
-         return false;
-      if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null)
-         return false;
+      if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null) return false;
+      if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null) return false;
       if (type != event.type) return false;
 
       return true;
@@ -116,18 +116,27 @@
       result = 31 * result + (oldMembers != null ? oldMembers.hashCode() : 0);
       result = 31 * result + (localAddress != null ? localAddress.hashCode() : 0);
       result = 31 * result + viewId;
+      result = 31 * result + (needsToRejoin ? 1 : 0);
       return result;
    }
 
    @Override
    public String toString() {
       return "EventImpl{" +
-            "cacheName='" + cacheName + '\'' +
-            ", type=" + type +
-            ", newMembers=" + newMembers +
-            ", oldMembers=" + oldMembers +
-            ", localAddress=" + localAddress +
-            ", viewId=" + viewId +
-            '}';
+              "type=" + type +
+              ", newMembers=" + newMembers +
+              ", oldMembers=" + oldMembers +
+              ", localAddress=" + localAddress +
+              ", viewId=" + viewId +
+              ", needsToRejoin=" + needsToRejoin +
+              '}';
    }
+
+   public void setNeedsToRejoin(boolean needsToRejoin) {
+      this.needsToRejoin = needsToRejoin;
+   }
+
+   public boolean isNeedsToRejoin() {
+      return needsToRejoin;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -41,5 +41,7 @@
 
    Address getLocalAddress();
 
+   boolean isNeedsToRejoin();
+
    int getViewId();
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -51,6 +51,7 @@
 import org.jgroups.ExtendedMembershipListener;
 import org.jgroups.ExtendedMessageListener;
 import org.jgroups.JChannel;
+import org.jgroups.MergeView;
 import org.jgroups.Message;
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
@@ -492,7 +493,7 @@
 
          // now notify listeners - *after* updating the coordinator. - JBCACHE-662
          if (needNotification && notifier != null) {
-            notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId());
+            notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
          }
 
          // Wake up any threads that are waiting to know about who the coordinator is
@@ -500,6 +501,23 @@
       }
    }
 
+   private boolean needsToRejoin(View v) {
+      if (v instanceof MergeView) {
+         MergeView mv = (MergeView) v;
+         org.jgroups.Address coord = v.getMembers().get(0);
+         View winningPartition = null;
+         for (View p: mv.getSubgroups()) {
+            if (p.getMembers().get(0).equals(coord)) {
+               winningPartition = p;
+               break;
+            }
+         }
+
+         if (!winningPartition.containsMember(channel.getAddress())) return true;
+      }
+      return false;
+   }
+
    public void suspect(org.jgroups.Address suspected_mbr) {
       // no-op
    }

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -16,6 +16,8 @@
 import org.infinispan.test.fwk.TestCacheManagerFactory;
 import org.jgroups.Channel;
 import org.jgroups.protocols.DISCARD;
+import org.jgroups.protocols.TP;
+import org.jgroups.stack.ProtocolStack;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -36,32 +38,21 @@
 
    @Override
    protected void createCacheManagers() throws Throwable {
-      Configuration c = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
-      c.setLockAcquisitionTimeout(1000);
+      caches = createClusteredCaches(2, "test", getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC));
 
-      GlobalConfiguration gc = GlobalConfiguration.getClusteredDefault();      
-      amendMarshaller(gc);
-      minimizeThreads(gc);
-      Properties newTransportProps = new Properties();
-      String jgc = JGroupsConfigBuilder.getJGroupsConfig();
-      String discardString = "):DISCARD(use_gui=false";
-      String newString = jgc.substring(0, jgc.indexOf("):")) + discardString + jgc.substring(jgc.indexOf("):"));
-      newTransportProps.put(CONFIGURATION_STRING, newString);
-      gc.setTransportProperties(newTransportProps);
-      CacheManager cm1 = TestCacheManagerFactory.createCacheManager(gc, c, true, false, true);
-      CacheManager cm2 = TestCacheManagerFactory.createCacheManager(gc, c, true, false, true);
-      registerCacheManager(cm1, cm2);
-      c1 = cm1.getCache();
-      c2 = cm2.getCache();
-      caches = Arrays.asList(c1, c2);
+      c1 = caches.get(0);
+      c2 = caches.get(1);
       d1 = getDiscardForCache(c1);
       d2 = getDiscardForCache(c2);
    }
 
-   private DISCARD getDiscardForCache(Cache<?, ?> c) {
+   private DISCARD getDiscardForCache(Cache<?, ?> c) throws Exception {
       JGroupsTransport jgt = (JGroupsTransport) TestingUtil.extractComponent(c, Transport.class);
       Channel ch = jgt.getChannel();
-      return (DISCARD) ch.getProtocolStack().findProtocol(DISCARD.class);
+      ProtocolStack ps = ch.getProtocolStack();
+      DISCARD discard = new DISCARD();
+      ps.insertProtocol(discard, ProtocolStack.ABOVE, TP.class);
+      return discard;
    }
 
    public void testCachePartition() {

Modified: trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -29,7 +29,7 @@
    public void testNotifyViewChanged() {
       Address a = EasyMock.createNiceMock(Address.class);
       List<Address> addresses = Collections.emptyList();
-      n.notifyViewChange(addresses, addresses, a, 100);
+      n.notifyViewChange(addresses, addresses, a, 100, false);
 
       assert cl.invocationCount == 1;
       assert ((ViewChangedEvent) cl.getEvent()).getLocalAddress() == a;

Modified: trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java	2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java	2010-04-12 14:34:20 UTC (rev 1684)
@@ -9,7 +9,6 @@
 import org.infinispan.test.AbstractInfinispanTest;
 import org.infinispan.test.TestingUtil;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.transaction.xa.TransactionTable;
 import org.infinispan.transaction.xa.TransactionTable.StaleTransactionCleanup;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -46,7 +45,7 @@
       CacheManagerNotifier mockNotifier = createMock(CacheManagerNotifier.class);
       CacheManagerNotifier origNotifier = TestingUtil.replaceComponent(cm1, CacheManagerNotifier.class, mockNotifier, true);
       try {
-         mockNotifier.notifyViewChange(isA(List.class), isA(List.class), eq(myAddress), anyInt());
+         mockNotifier.notifyViewChange(isA(List.class), isA(List.class), eq(myAddress), anyInt(), false);
          replay(mockNotifier);
          // start a second cache.
          Cache c2 = cm2.getCache("cache");



More information about the infinispan-commits mailing list