[infinispan-commits] Infinispan SVN: r1217 - in trunk/core/src: main/java/org/infinispan/commands/write and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Nov 24 12:25:18 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-11-24 12:25:18 -0500 (Tue, 24 Nov 2009)
New Revision: 1217

Added:
   trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
   trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
   trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
   trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
   trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java
Log:
[ISPN-249] ( L1Type.onRehash should be in use but it isn't)

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -94,10 +94,11 @@
 
    /**
     * Builds an InvalidateFromL1Command
+    * @param forRehash set to true if the invalidation is happening due to a new node taking ownership.  False if it is due to a write, changing the state of the entry.
     * @param keys keys to invalidate
     * @return an InvalidateFromL1Command
     */
-   InvalidateCommand buildInvalidateFromL1Command(Object... keys);
+   InvalidateCommand buildInvalidateFromL1Command(boolean forRehash, Object... keys);
 
    /**
     * Builds a ReplaceCommand

Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -127,8 +127,8 @@
       return new InvalidateCommand(notifier, keys);
    }
 
-   public InvalidateCommand buildInvalidateFromL1Command(Object... keys) {
-      return new InvalidateL1Command(distributionManager, notifier, keys);
+   public InvalidateCommand buildInvalidateFromL1Command(boolean forRehash, Object... keys) {
+      return new InvalidateL1Command(forRehash, dataContainer, configuration, distributionManager, notifier, keys);
    }
 
    public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespan, long maxIdleTimeMillis) {
@@ -247,7 +247,7 @@
             break;
          case InvalidateL1Command.COMMAND_ID:
             InvalidateL1Command ilc = (InvalidateL1Command) c;
-            ilc.init(distributionManager, notifier);
+            ilc.init(configuration, distributionManager, notifier, dataContainer);
             break;
          case PrepareCommand.COMMAND_ID:
             PrepareCommand pc = (PrepareCommand) c;

Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -44,7 +44,7 @@
    public static final int COMMAND_ID = 6;
    private static final Log log = LogFactory.getLog(InvalidateCommand.class);
    private static final boolean trace = log.isTraceEnabled();
-   private Object[] keys;
+   protected Object[] keys;
 
    public InvalidateCommand() {
    }

Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -1,5 +1,8 @@
 package org.infinispan.commands.write;
 
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.marshall.Ids;
@@ -17,13 +20,20 @@
 public class InvalidateL1Command extends InvalidateCommand {
    public static final int COMMAND_ID = 7;
    private DistributionManager dm;
+   private DataContainer dataContainer;
+   private Configuration config;
+   private boolean forRehash;
 
    public InvalidateL1Command() {
    }
 
-   public InvalidateL1Command(DistributionManager dm, CacheNotifier notifier, Object... keys) {
+   public InvalidateL1Command(boolean forRehash, DataContainer dc, Configuration config, DistributionManager dm,
+                              CacheNotifier notifier, Object... keys) {
       super(notifier, keys);
       this.dm = dm;
+      this.forRehash = forRehash;
+      this.dataContainer = dc;
+      this.config = config;
    }
 
    @Override
@@ -31,21 +41,73 @@
       return COMMAND_ID;
    }
 
-   public void init(DistributionManager dm, CacheNotifier n) {
+   public void init(Configuration config, DistributionManager dm, CacheNotifier n, DataContainer dc) {
       super.init(n);
       this.dm = dm;
+      this.config = config;
+      this.dataContainer = dc;
    }
 
    @Override
    public Object perform(InvocationContext ctx) throws Throwable {
-      for (Object k : getKeys()) {
-         if (!dm.isLocal(k)) invalidate(ctx, k);
+      if (forRehash && config.isL1OnRehash()) {
+         for (Object k : getKeys()) {
+            InternalCacheEntry ice = dataContainer.get(k);
+            if (ice != null)
+               dataContainer.put(k, ice.getValue(), config.getL1Lifespan(), config.getExpirationMaxIdle());
+         }
+      } else {
+         for (Object k : getKeys()) {
+            if (!dm.isLocal(k)) invalidate(ctx, k);
+         }
       }
       return null;
    }
 
    @Override
    public boolean equals(Object o) {
-      return this == o || o instanceof InvalidateL1Command && super.equals(o);
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      if (!super.equals(o)) return false;
+
+      InvalidateL1Command that = (InvalidateL1Command) o;
+
+      if (forRehash != that.forRehash) return false;
+
+      return true;
    }
+
+   @Override
+   public Object[] getParameters() {
+      if (keys == null || keys.length == 0) {
+         return new Object[]{forRehash};
+      } else if (keys.length == 1) {
+         return new Object[]{forRehash, 1, keys[0]};
+      } else {
+         Object[] retval = new Object[keys.length + 2];
+         retval[0] = forRehash;
+         retval[1] = keys.length;
+         System.arraycopy(keys, 0, retval, 2, keys.length);
+         return retval;
+      }
+   }
+
+   @Override
+   public void setParameters(int commandId, Object[] args) {
+      forRehash = (Boolean) args[0];
+      int size = (Integer) args[1];
+      keys = new Object[size];
+      if (size == 1) {
+         keys[0] = args[2];
+      } else if (size > 0) {
+         System.arraycopy(args, 2, keys, 0, size);
+      }
+   }
+
+   @Override
+   public int hashCode() {
+      int result = super.hashCode();
+      result = 31 * result + (forRehash ? 1 : 0);
+      return result;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -155,6 +155,11 @@
       }
    }
 
+   @Override
+   protected Log getLog() {
+      return log;
+   }
+
    // TODO unit test this!!!
    List<Address> getAddressesWhoMaySendStuff(int replCount) {
       List<Address> l = new LinkedList<Address>();

Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -103,6 +103,11 @@
       }
    }
 
+   @Override
+   protected Log getLog() {
+      return log;
+   }
+
    private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
       if (trace) log.trace("Processing transaction log iteratively");
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -8,6 +8,7 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.concurrent.NotifyingFutureImpl;
 import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -59,11 +60,14 @@
 
    protected abstract void performRehash() throws Exception;
 
+   protected abstract Log getLog();
+
    protected Collection<Address> coordinator() {
       return Collections.singleton(rpcManager.getTransport().getCoordinator());
    }
 
    protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+      if (getLog().isDebugEnabled()) getLog().debug("Invalidating entries that have migrated across");
       Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
       for (Object key : dataContainer.keySet()) {
          Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
@@ -80,7 +84,7 @@
       Set<Future> futures = new HashSet<Future>();
 
       for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
-         InvalidateCommand ic = cf.buildInvalidateFromL1Command(e.getValue().toArray());
+         InvalidateCommand ic = cf.buildInvalidateFromL1Command(true, e.getValue().toArray());
          NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
          rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
          futures.add(f);

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -37,6 +37,7 @@
    protected boolean tx = false;
    protected boolean testRetVals = true;
    protected boolean l1CacheEnabled = true;
+   protected boolean l1OnRehash = false;
    protected boolean performRehashing = false;
    protected static final int NUM_OWNERS = 2;
 
@@ -54,6 +55,7 @@
       configuration.setSyncReplTimeout(60, TimeUnit.SECONDS);
       configuration.setLockAcquisitionTimeout(45, TimeUnit.SECONDS);
       configuration.setL1CacheEnabled(l1CacheEnabled);
+      if (l1CacheEnabled) configuration.setL1OnRehash(l1OnRehash);
       caches = createClusteredCaches(INIT_CLUSTER_SIZE, cacheName, configuration);
 
       reorderBasedOnCHPositions();
@@ -198,6 +200,18 @@
       }
    }
 
+   protected int locateJoiner(Address joinerAddress) {
+      for (Cache c : caches) {
+         ConsistentHash dch = getNonUnionConsistentHash(c, SECONDS.toMillis(480));
+         int i = 0;
+         for (Address a : dch.getCaches()) {
+            if (a.equals(joinerAddress)) return i;
+            i++;
+         }
+      }
+      throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
+   }
+
    protected static final String safeType(Object o) {
       if (o == null) return "null";
       return o.getClass().getSimpleName();
@@ -206,14 +220,14 @@
    protected void assertIsInL1(Cache<?, ?> cache, Object key) {
       DataContainer dc = cache.getAdvancedCache().getDataContainer();
       InternalCacheEntry ice = dc.get(key);
-      assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+      assert ice != null : "Entry for key [" + key + "] should be in L1 on cache at [" + addressOf(cache) + "]!";
       assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + addressOf(cache) + "]!";
    }
 
    protected void assertIsNotInL1(Cache<?, ?> cache, Object key) {
       DataContainer dc = cache.getAdvancedCache().getDataContainer();
       InternalCacheEntry ice = dc.get(key);
-      assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + addressOf(cache) + "]!";
+      assert ice == null : "Entry for key [" + key + "] should not be in data container at all on cache at [" + addressOf(cache) + "]!";
    }
 
    protected void assertIsInContainerImmortal(Cache<?, ?> cache, Object key) {

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -0,0 +1,97 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.manager.CacheManager;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Ensures entries are moved to L1 if they are removed due to a rehash
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.L1OnRehashTest")
+public class L1OnRehashTest extends BaseDistFunctionalTest {
+   public L1OnRehashTest() {
+      this.tx = false;
+      this.sync = true;
+      this.l1CacheEnabled = true;
+      this.performRehashing = true;
+      this.l1OnRehash = true;
+      this.INIT_CLUSTER_SIZE = 2;
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   CacheManager joinerManager;
+   Cache<Object, String> joiner;
+
+   void performRehashEvent() {
+      joinerManager = addClusterEnabledCacheManager();
+      joinerManager.defineConfiguration(cacheName, configuration);
+      joiner = joinerManager.getCache(cacheName);
+   }
+
+   int waitForRehashCompletion() {
+      // need to block until this join has completed!
+      waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
+
+      // where does the joiner sit in relation to the other caches?
+      int joinerPos = locateJoiner(joinerManager.getAddress());
+
+      log.info("***>>> Joiner is in position " + joinerPos);
+
+      caches.add(joinerPos, joiner);
+      return joinerPos;
+   }
+
+   private List<MagicKey> init() {
+      List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
+            new MagicKey(c1, "k1"), new MagicKey(c2, "k2")
+      ));
+
+      int i = 0;
+      for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
+
+      i = 0;
+      for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+      log.info("Initialized with keys {0}", keys);
+      return keys;
+   }
+
+   public void testInvalidationBehaviorOnRehash() {
+      // start with 2 caches...
+      List<MagicKey> keys = init();
+      System.out.println("Old CH positions are " + getConsistentHash(c1));
+      // add 1
+      performRehashEvent();
+      int joinerPos = waitForRehashCompletion();
+
+      // now where is joiner in relation to the other 2?
+      // we can have either
+      // 1. J, C1, C2
+      // 2. C1, J, C2
+      // 3. C1, C2, J
+      // for the purpose of CH, 1 == 3.
+
+      System.out.println("New CH positions are " + getConsistentHash(c1));
+      // invalidations happen asynchronously!  :(
+      TestingUtil.sleepThread(2000);
+
+      Cache<Object, String> cacheToCheckForInvalidation = joinerPos + 1 == caches.size() ? caches.get(0) : caches.get(joinerPos + 1);
+      MagicKey rehashedKey = keys.get(joinerPos == 1 ? 0 : 1);
+      if (l1OnRehash)
+         assertIsInL1(cacheToCheckForInvalidation, rehashedKey);
+      else
+         assertIsNotInL1(cacheToCheckForInvalidation, rehashedKey);
+
+      assertIsInContainerImmortal(joiner, rehashedKey);
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -0,0 +1,17 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+/**
+ * Control for L1OnRehashTest
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.NoL1OnRehashTest")
+public class NoL1OnRehashTest extends L1OnRehashTest {
+   public NoL1OnRehashTest() {
+      super();
+      this.l1OnRehash = false;            
+   }
+}


Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -2,8 +2,6 @@
 
 import org.infinispan.Cache;
 import org.infinispan.distribution.BaseDistFunctionalTest;
-import org.infinispan.distribution.ConsistentHash;
-import org.infinispan.remoting.transport.Address;
 import org.infinispan.test.TestingUtil;
 import org.testng.annotations.Test;
 
@@ -16,7 +14,6 @@
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
-import static java.util.concurrent.TimeUnit.SECONDS;
 
 /**
  * A base test for all rehashing tests
@@ -46,19 +43,6 @@
       TestingUtil.sleepThread(1000);
    }
 
-   protected int locateJoiner(Address joinerAddress) {
-      for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
-         ConsistentHash dch = getNonUnionConsistentHash(c, SECONDS.toMillis(480));
-         int i = 0;
-         for (Address a : dch.getCaches()) {
-            if (a.equals(joinerAddress)) return i;
-            i++;
-         }
-      }
-      throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
-   }
-
-
    private List<MagicKey> init() {
       List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
             new MagicKey(c1, "k1"), new MagicKey(c2, "k2"),

Modified: trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java	2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java	2009-11-24 17:25:18 UTC (rev 1217)
@@ -51,8 +51,6 @@
 import org.infinispan.container.entries.TransientMortalCacheEntry;
 import org.infinispan.container.entries.TransientMortalCacheValue;
 import org.infinispan.loaders.bucket.Bucket;
-import org.infinispan.marshall.MarshalledValue;
-import org.infinispan.marshall.VersionAwareMarshaller;
 import org.infinispan.marshall.jboss.JBossMarshallingTest.CustomReadObjectMethod;
 import org.infinispan.marshall.jboss.JBossMarshallingTest.ObjectThatContainsACustomReadObjectMethod;
 import org.infinispan.remoting.MIMECacheEntry;
@@ -246,7 +244,7 @@
       assert rc7.getCommandId() == c7.getCommandId() : "Writen[" + c7.getCommandId() + "] and read[" + rc7.getCommandId() + "] objects should be the same";
       assert Arrays.equals(rc7.getParameters(), c7.getParameters()) : "Writen[" + c7.getParameters() + "] and read[" + rc7.getParameters() + "] objects should be the same";
 
-      InvalidateCommand c71 = new InvalidateL1Command(null, null, "key1", "key2");
+      InvalidateCommand c71 = new InvalidateL1Command(false, null, null, null, null, "key1", "key2");
       bytes = marshaller.objectToByteBuffer(c71);
       InvalidateCommand rc71 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
       assert rc71.getCommandId() == c71.getCommandId() : "Writen[" + c71.getCommandId() + "] and read[" + rc71.getCommandId() + "] objects should be the same";



More information about the infinispan-commits mailing list