[infinispan-commits] Infinispan SVN: r716 - in trunk/core/src: main/java/org/infinispan/interceptors and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Aug 21 08:02:02 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-08-21 08:02:02 -0400 (Fri, 21 Aug 2009)
New Revision: 716

Added:
   trunk/core/src/test/java/org/infinispan/distribution/JoinTaskTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
Log:
Fixed bug in JoinTask and added test

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-21 02:10:33 UTC (rev 715)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-08-21 12:02:02 UTC (rev 716)
@@ -114,7 +114,7 @@
          Address myAddress = rpcManager.getTransport().getAddress();
          RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
          // TODO I should be able to process state chunks from different nodes simultaneously!!
-         List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff();
+         List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff(configuration.getNumOwners());
          List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
 
          // 7.  Apply state
@@ -148,17 +148,25 @@
    }
 
    // TODO unit test this!!!
-   private List<Address> getAddressesWhoMaySendStuff() {
+   List<Address> getAddressesWhoMaySendStuff(int replCount) {
+      List<Address> l = new LinkedList<Address>();
       List<Address> caches = chNew.getCaches();
       int selfIdx = caches.indexOf(self);
-      int replCount = configuration.getNumOwners();
       if (selfIdx >= replCount - 1) {
-         return caches.subList(selfIdx - replCount + 1, selfIdx);
+         l.addAll(caches.subList(selfIdx - replCount + 1, selfIdx));
       } else {
-         List<Address> l = new LinkedList<Address>(caches.subList(0, selfIdx));
+         l.addAll(caches.subList(0, selfIdx));
          int alreadyCollected = l.size();
          l.addAll(caches.subList(caches.size() - replCount + 1 + alreadyCollected, caches.size()));
-         return l;
       }
+
+      Address plusOne;
+      if (selfIdx == caches.size() - 1)
+         plusOne = caches.get(0);
+      else
+         plusOne = caches.get(selfIdx + 1);
+
+      if (!l.contains(plusOne)) l.add(plusOne);
+      return l;
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-08-21 02:10:33 UTC (rev 715)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-08-21 12:02:02 UTC (rev 716)
@@ -265,7 +265,10 @@
       remoteGetBeforeWrite(ctx, command.isConditional(), recipientGenerator.getKeys());
 
       // if this is local mode then skip distributing
-      if (localModeForced) return invokeNextInterceptor(ctx, command);
+      if (localModeForced) {
+         log.trace("LOCAL mode forced.  No RPC needed.");
+         return invokeNextInterceptor(ctx, command);
+      }
 
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to distribute.
       Object returnValue = invokeNextInterceptor(ctx, command);

Added: trunk/core/src/test/java/org/infinispan/distribution/JoinTaskTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/JoinTaskTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/JoinTaskTest.java	2009-08-21 12:02:02 UTC (rev 716)
@@ -0,0 +1,56 @@
+package org.infinispan.distribution;
+
+import org.easymock.EasyMock;
+import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+ at Test(groups = "unit", testName = "distribution.JoinTaskTest")
+public class JoinTaskTest {
+   public void testCalculatingWhosensStateRC2() {
+      doTest(2);
+   }
+
+   public void testCalculatingWhosensStateRC4() {
+      doTest(4);
+   }
+
+   private void doTest(int rc) {
+      Address a1 = new TestAddress(10);
+      Address a2 = new TestAddress(20);
+      Address a3 = new TestAddress(30);
+      Address a4 = new TestAddress(40);
+      Address a5 = new TestAddress(50);
+      Address a6 = new TestAddress(60);
+
+      Address joiner = new TestAddress(33);
+
+      Transport trans = EasyMock.createNiceMock(Transport.class);
+      EasyMock.expect(trans.getAddress()).andReturn(joiner).anyTimes();
+
+      RpcManager rpc = EasyMock.createNiceMock(RpcManager.class);
+      EasyMock.expect(rpc.getTransport()).andReturn(trans).anyTimes();
+
+      EasyMock.replay(trans, rpc);
+
+      JoinTask jt = new JoinTask(rpc, null, null, null, null, null);
+
+      ConsistentHash ch = new DefaultConsistentHash();
+      ch.setCaches(Arrays.asList(a1, a2, a3, a4, a5, a6, joiner));
+
+      jt.chNew = ch;
+
+      List<Address> a = jt.getAddressesWhoMaySendStuff(rc);
+      List<Address> expected;
+      if (rc == 2)
+         expected = Arrays.asList(a3, a4);
+      else
+         expected = Arrays.asList(a1, a2, a3, a4);
+
+      assert a.equals(expected) : "Expected " + expected + " but was " + a;
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/JoinTaskTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list