[infinispan-commits] Infinispan SVN: r1217 - in trunk/core/src: main/java/org/infinispan/commands/write and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Nov 24 12:25:18 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-11-24 12:25:18 -0500 (Tue, 24 Nov 2009)
New Revision: 1217
Added:
trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
Modified:
trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java
Log:
[ISPN-249] ( L1Type.onRehash should be in use but it isn't)
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactory.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -94,10 +94,11 @@
/**
* Builds an InvalidateFromL1Command
+ * @param forRehash set to true if the invalidation is happening due to a new node taking ownership. False if it is due to a write, changing the state of the entry.
* @param keys keys to invalidate
* @return an InvalidateFromL1Command
*/
- InvalidateCommand buildInvalidateFromL1Command(Object... keys);
+ InvalidateCommand buildInvalidateFromL1Command(boolean forRehash, Object... keys);
/**
* Builds a ReplaceCommand
Modified: trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -127,8 +127,8 @@
return new InvalidateCommand(notifier, keys);
}
- public InvalidateCommand buildInvalidateFromL1Command(Object... keys) {
- return new InvalidateL1Command(distributionManager, notifier, keys);
+ public InvalidateCommand buildInvalidateFromL1Command(boolean forRehash, Object... keys) {
+ return new InvalidateL1Command(forRehash, dataContainer, configuration, distributionManager, notifier, keys);
}
public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, long lifespan, long maxIdleTimeMillis) {
@@ -247,7 +247,7 @@
break;
case InvalidateL1Command.COMMAND_ID:
InvalidateL1Command ilc = (InvalidateL1Command) c;
- ilc.init(distributionManager, notifier);
+ ilc.init(configuration, distributionManager, notifier, dataContainer);
break;
case PrepareCommand.COMMAND_ID:
PrepareCommand pc = (PrepareCommand) c;
Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateCommand.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -44,7 +44,7 @@
public static final int COMMAND_ID = 6;
private static final Log log = LogFactory.getLog(InvalidateCommand.class);
private static final boolean trace = log.isTraceEnabled();
- private Object[] keys;
+ protected Object[] keys;
public InvalidateCommand() {
}
Modified: trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -1,5 +1,8 @@
package org.infinispan.commands.write;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.marshall.Ids;
@@ -17,13 +20,20 @@
public class InvalidateL1Command extends InvalidateCommand {
public static final int COMMAND_ID = 7;
private DistributionManager dm;
+ private DataContainer dataContainer;
+ private Configuration config;
+ private boolean forRehash;
public InvalidateL1Command() {
}
- public InvalidateL1Command(DistributionManager dm, CacheNotifier notifier, Object... keys) {
+ public InvalidateL1Command(boolean forRehash, DataContainer dc, Configuration config, DistributionManager dm,
+ CacheNotifier notifier, Object... keys) {
super(notifier, keys);
this.dm = dm;
+ this.forRehash = forRehash;
+ this.dataContainer = dc;
+ this.config = config;
}
@Override
@@ -31,21 +41,73 @@
return COMMAND_ID;
}
- public void init(DistributionManager dm, CacheNotifier n) {
+ public void init(Configuration config, DistributionManager dm, CacheNotifier n, DataContainer dc) {
super.init(n);
this.dm = dm;
+ this.config = config;
+ this.dataContainer = dc;
}
@Override
public Object perform(InvocationContext ctx) throws Throwable {
- for (Object k : getKeys()) {
- if (!dm.isLocal(k)) invalidate(ctx, k);
+ if (forRehash && config.isL1OnRehash()) {
+ for (Object k : getKeys()) {
+ InternalCacheEntry ice = dataContainer.get(k);
+ if (ice != null)
+ dataContainer.put(k, ice.getValue(), config.getL1Lifespan(), config.getExpirationMaxIdle());
+ }
+ } else {
+ for (Object k : getKeys()) {
+ if (!dm.isLocal(k)) invalidate(ctx, k);
+ }
}
return null;
}
@Override
public boolean equals(Object o) {
- return this == o || o instanceof InvalidateL1Command && super.equals(o);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+
+ InvalidateL1Command that = (InvalidateL1Command) o;
+
+ if (forRehash != that.forRehash) return false;
+
+ return true;
}
+
+ @Override
+ public Object[] getParameters() {
+ if (keys == null || keys.length == 0) {
+ return new Object[]{forRehash};
+ } else if (keys.length == 1) {
+ return new Object[]{forRehash, 1, keys[0]};
+ } else {
+ Object[] retval = new Object[keys.length + 2];
+ retval[0] = forRehash;
+ retval[1] = keys.length;
+ System.arraycopy(keys, 0, retval, 2, keys.length);
+ return retval;
+ }
+ }
+
+ @Override
+ public void setParameters(int commandId, Object[] args) {
+ forRehash = (Boolean) args[0];
+ int size = (Integer) args[1];
+ keys = new Object[size];
+ if (size == 1) {
+ keys[0] = args[2];
+ } else if (size > 0) {
+ System.arraycopy(args, 2, keys, 0, size);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (forRehash ? 1 : 0);
+ return result;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -155,6 +155,11 @@
}
}
+ @Override
+ protected Log getLog() {
+ return log;
+ }
+
// TODO unit test this!!!
List<Address> getAddressesWhoMaySendStuff(int replCount) {
List<Address> l = new LinkedList<Address>();
Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -103,6 +103,11 @@
}
}
+ @Override
+ protected Log getLog() {
+ return log;
+ }
+
private void processAndDrainTxLog(ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
if (trace) log.trace("Processing transaction log iteratively");
Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -8,6 +8,7 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.NotifyingFutureImpl;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
import java.util.Collection;
import java.util.Collections;
@@ -59,11 +60,14 @@
protected abstract void performRehash() throws Exception;
+ protected abstract Log getLog();
+
protected Collection<Address> coordinator() {
return Collections.singleton(rpcManager.getTransport().getCoordinator());
}
protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+ if (getLog().isDebugEnabled()) getLog().debug("Invalidating entries that have migrated across");
Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
for (Object key : dataContainer.keySet()) {
Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
@@ -80,7 +84,7 @@
Set<Future> futures = new HashSet<Future>();
for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
- InvalidateCommand ic = cf.buildInvalidateFromL1Command(e.getValue().toArray());
+ InvalidateCommand ic = cf.buildInvalidateFromL1Command(true, e.getValue().toArray());
NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
futures.add(f);
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -37,6 +37,7 @@
protected boolean tx = false;
protected boolean testRetVals = true;
protected boolean l1CacheEnabled = true;
+ protected boolean l1OnRehash = false;
protected boolean performRehashing = false;
protected static final int NUM_OWNERS = 2;
@@ -54,6 +55,7 @@
configuration.setSyncReplTimeout(60, TimeUnit.SECONDS);
configuration.setLockAcquisitionTimeout(45, TimeUnit.SECONDS);
configuration.setL1CacheEnabled(l1CacheEnabled);
+ if (l1CacheEnabled) configuration.setL1OnRehash(l1OnRehash);
caches = createClusteredCaches(INIT_CLUSTER_SIZE, cacheName, configuration);
reorderBasedOnCHPositions();
@@ -198,6 +200,18 @@
}
}
+ protected int locateJoiner(Address joinerAddress) {
+ for (Cache c : caches) {
+ ConsistentHash dch = getNonUnionConsistentHash(c, SECONDS.toMillis(480));
+ int i = 0;
+ for (Address a : dch.getCaches()) {
+ if (a.equals(joinerAddress)) return i;
+ i++;
+ }
+ }
+ throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
+ }
+
protected static final String safeType(Object o) {
if (o == null) return "null";
return o.getClass().getSimpleName();
@@ -206,14 +220,14 @@
protected void assertIsInL1(Cache<?, ?> cache, Object key) {
DataContainer dc = cache.getAdvancedCache().getDataContainer();
InternalCacheEntry ice = dc.get(key);
- assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+ assert ice != null : "Entry for key [" + key + "] should be in L1 on cache at [" + addressOf(cache) + "]!";
assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + addressOf(cache) + "]!";
}
protected void assertIsNotInL1(Cache<?, ?> cache, Object key) {
DataContainer dc = cache.getAdvancedCache().getDataContainer();
InternalCacheEntry ice = dc.get(key);
- assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + addressOf(cache) + "]!";
+ assert ice == null : "Entry for key [" + key + "] should not be in data container at all on cache at [" + addressOf(cache) + "]!";
}
protected void assertIsInContainerImmortal(Cache<?, ?> cache, Object key) {
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -0,0 +1,97 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.manager.CacheManager;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * Ensures entries are moved to L1 if they are removed due to a rehash
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.L1OnRehashTest")
+public class L1OnRehashTest extends BaseDistFunctionalTest {
+ public L1OnRehashTest() {
+ this.tx = false;
+ this.sync = true;
+ this.l1CacheEnabled = true;
+ this.performRehashing = true;
+ this.l1OnRehash = true;
+ this.INIT_CLUSTER_SIZE = 2;
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ CacheManager joinerManager;
+ Cache<Object, String> joiner;
+
+ void performRehashEvent() {
+ joinerManager = addClusterEnabledCacheManager();
+ joinerManager.defineConfiguration(cacheName, configuration);
+ joiner = joinerManager.getCache(cacheName);
+ }
+
+ int waitForRehashCompletion() {
+ // need to block until this join has completed!
+ waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
+
+ // where does the joiner sit in relation to the other caches?
+ int joinerPos = locateJoiner(joinerManager.getAddress());
+
+ log.info("***>>> Joiner is in position " + joinerPos);
+
+ caches.add(joinerPos, joiner);
+ return joinerPos;
+ }
+
+ private List<MagicKey> init() {
+ List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
+ new MagicKey(c1, "k1"), new MagicKey(c2, "k2")
+ ));
+
+ int i = 0;
+ for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
+
+ i = 0;
+ for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+ log.info("Initialized with keys {0}", keys);
+ return keys;
+ }
+
+ public void testInvalidationBehaviorOnRehash() {
+ // start with 2 caches...
+ List<MagicKey> keys = init();
+ System.out.println("Old CH positions are " + getConsistentHash(c1));
+ // add 1
+ performRehashEvent();
+ int joinerPos = waitForRehashCompletion();
+
+ // now where is joiner in relation to the other 2?
+ // we can have either
+ // 1. J, C1, C2
+ // 2. C1, J, C2
+ // 3. C1, C2, J
+ // for the purpose of CH, 1 == 3.
+
+ System.out.println("New CH positions are " + getConsistentHash(c1));
+ // invalidations happen asynchronously! :(
+ TestingUtil.sleepThread(2000);
+
+ Cache<Object, String> cacheToCheckForInvalidation = joinerPos + 1 == caches.size() ? caches.get(0) : caches.get(joinerPos + 1);
+ MagicKey rehashedKey = keys.get(joinerPos == 1 ? 0 : 1);
+ if (l1OnRehash)
+ assertIsInL1(cacheToCheckForInvalidation, rehashedKey);
+ else
+ assertIsNotInL1(cacheToCheckForInvalidation, rehashedKey);
+
+ assertIsInContainerImmortal(joiner, rehashedKey);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/L1OnRehashTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -0,0 +1,17 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+/**
+ * Control for L1OnRehashTest
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.NoL1OnRehashTest")
+public class NoL1OnRehashTest extends L1OnRehashTest {
+ public NoL1OnRehashTest() {
+ super();
+ this.l1OnRehash = false;
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/NoL1OnRehashTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -2,8 +2,6 @@
import org.infinispan.Cache;
import org.infinispan.distribution.BaseDistFunctionalTest;
-import org.infinispan.distribution.ConsistentHash;
-import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.Test;
@@ -16,7 +14,6 @@
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
-import static java.util.concurrent.TimeUnit.SECONDS;
/**
* A base test for all rehashing tests
@@ -46,19 +43,6 @@
TestingUtil.sleepThread(1000);
}
- protected int locateJoiner(Address joinerAddress) {
- for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
- ConsistentHash dch = getNonUnionConsistentHash(c, SECONDS.toMillis(480));
- int i = 0;
- for (Address a : dch.getCaches()) {
- if (a.equals(joinerAddress)) return i;
- i++;
- }
- }
- throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
- }
-
-
private List<MagicKey> init() {
List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
new MagicKey(c1, "k1"), new MagicKey(c2, "k2"),
Modified: trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java 2009-11-24 17:24:01 UTC (rev 1216)
+++ trunk/core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java 2009-11-24 17:25:18 UTC (rev 1217)
@@ -51,8 +51,6 @@
import org.infinispan.container.entries.TransientMortalCacheEntry;
import org.infinispan.container.entries.TransientMortalCacheValue;
import org.infinispan.loaders.bucket.Bucket;
-import org.infinispan.marshall.MarshalledValue;
-import org.infinispan.marshall.VersionAwareMarshaller;
import org.infinispan.marshall.jboss.JBossMarshallingTest.CustomReadObjectMethod;
import org.infinispan.marshall.jboss.JBossMarshallingTest.ObjectThatContainsACustomReadObjectMethod;
import org.infinispan.remoting.MIMECacheEntry;
@@ -246,7 +244,7 @@
assert rc7.getCommandId() == c7.getCommandId() : "Writen[" + c7.getCommandId() + "] and read[" + rc7.getCommandId() + "] objects should be the same";
assert Arrays.equals(rc7.getParameters(), c7.getParameters()) : "Writen[" + c7.getParameters() + "] and read[" + rc7.getParameters() + "] objects should be the same";
- InvalidateCommand c71 = new InvalidateL1Command(null, null, "key1", "key2");
+ InvalidateCommand c71 = new InvalidateL1Command(false, null, null, null, null, "key1", "key2");
bytes = marshaller.objectToByteBuffer(c71);
InvalidateCommand rc71 = (InvalidateCommand) marshaller.objectFromByteBuffer(bytes);
assert rc71.getCommandId() == c71.getCommandId() : "Writen[" + c71.getCommandId() + "] and read[" + rc71.getCommandId() + "] objects should be the same";
More information about the infinispan-commits
mailing list