[infinispan-commits] Infinispan SVN: r709 - in trunk/core/src: main/java/org/infinispan/distribution and 5 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Aug 20 18:27:14 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-08-20 18:27:13 -0400 (Thu, 20 Aug 2009)
New Revision: 709
Added:
trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
Removed:
trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java
trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java
trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java
trunk/core/src/test/java/org/infinispan/util/concurrent/
Modified:
trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
trunk/core/src/main/java/org/infinispan/util/Util.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
Log:
[ISPN-65] (Rehashing) feature complete. Needs more tests now.
Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -105,7 +105,7 @@
if (cacheStore != null) {
for (InternalCacheEntry ice : cacheStore.loadAll()) {
Object k = ice.getKey();
- if (shouldAddToMap(k, oldCH, numCopies, self) && !state.containsKey(k))
+ if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self))
state.put(k, ice.toInternalCacheValue());
}
}
@@ -124,7 +124,8 @@
}
public Object pushState() {
- throw new RuntimeException("implement me");
+ distributionManager.applyReceivedState(state);
+ return null;
}
public byte getCommandId() {
Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -21,14 +21,14 @@
*
* @param caches caches in cluster.
*/
- void setCaches(Collection<Address> caches);
+ void setCaches(List<Address> caches);
/**
* Should return a collection of cache addresses in the cluster.
*
* @return collection of cache addresses
*/
- Collection<Address> getCaches();
+ List<Address> getCaches();
/**
* Locates a key, given a replication count (number of copies).
@@ -36,7 +36,7 @@
* @param key key to locate
* @param replCount replication count (number of copies)
* @return a list of addresses where the key resides, where this list is a subset of the addresses set in {@link
- * #setCaches(java.util.Collection)}. Should never be null, and should contain replCount elements or the max
+ * #setCaches(java.util.List)}. Should never be null, and should contain replCount elements or the max
* number of caches available, whichever is smaller.
*/
List<Address> locate(Object key, int replCount);
@@ -53,12 +53,23 @@
Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount);
/**
- * Tests whether a group of addresses are in the same subspace of the hash space. Addresses are in the same subspace
- * if an arbitrary key mapped to one address could also be mapped to the other.
+ * Calculates the logical distance between two addresses. This distance is based on where the addresses lie in the
+ * hash space.
*
* @param a1 address to test
* @param a2 address to test
- * @return true of the two addresses are in the same subspace, false otherwise.
+ * @return the distance between the 2 nodes. Always a positive number, where the distance between a1 and itself is
+ * 0. The distance between a1 and the next adjacent node is 1 and teh distance between a1 and the previous
+ * adjacent node is caches.size() - 1.
*/
- boolean isInSameSubspace(Address a1, Address a2);
+ int getDistance(Address a1, Address a2);
+
+ /**
+ * Tests whether two addresses are logically next to each other in the hash space.
+ *
+ * @param a1 address to test
+ * @param a2 address to test
+ * @return true if adjacent, false if not
+ */
+ boolean isAdjacent(Address a1, Address a2);
}
Added: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,63 @@
+package org.infinispan.distribution;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * // TODO: Manik: Document this
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class ConsistentHashHelper {
+
+ /**
+ * Returns a new consistent hash of the same type with the given address removed.
+ *
+ * @param ch consistent hash to start with
+ * @param toRemove address to remove
+ * @param c configuration
+ * @return a new consistent hash instance of the same type
+ */
+ public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ if (ch instanceof UnionConsistentHash)
+ return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c);
+ else {
+ ConsistentHash newCH = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
+ List<Address> caches = ch.getCaches();
+ caches.remove(toRemove);
+ newCH.setCaches(caches);
+ return newCH;
+ }
+ }
+
+ public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+ ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c);
+ ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c);
+ return new UnionConsistentHash(newFirstCH, newSecondCH);
+ }
+
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses) throws IllegalAccessException, InstantiationException, ClassNotFoundException {
+ ConsistentHash ch = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
+ ch.setCaches(addresses);
+ return ch;
+ }
+
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Address... moreAddresses) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+ List<Address> list = new LinkedList<Address>(addresses);
+ list.addAll(Arrays.asList(moreAddresses));
+ return createConsistentHash(c, list);
+ }
+
+ public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses) throws IllegalAccessException, ClassNotFoundException, InstantiationException {
+ List<Address> list = new LinkedList<Address>(addresses);
+ list.addAll(moreAddresses);
+ return createConsistentHash(c, list);
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHashHelper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -7,8 +7,8 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import static java.lang.Math.min;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -23,7 +23,7 @@
final static int HASH_SPACE = 10240; // no more than 10k nodes?
- public void setCaches(Collection<Address> caches) {
+ public void setCaches(List<Address> caches) {
addresses = new ArrayList<Address>(caches);
@@ -38,16 +38,19 @@
while (positions.containsKey(positionIndex)) positionIndex = positionIndex + 1 % HASH_SPACE;
positions.put(positionIndex, a);
}
+
+ addresses.clear();
+ for (Address a : positions.values()) addresses.add(a);
}
- public Collection<Address> getCaches() {
+ public List<Address> getCaches() {
return addresses;
}
public List<Address> locate(Object key, int replCount) {
int hash = Math.abs(key.hashCode());
int clusterSize = addresses.size();
- int numCopiesToFind = Math.min(replCount, clusterSize);
+ int numCopiesToFind = min(replCount, clusterSize);
List<Address> owners = new ArrayList<Address>(numCopiesToFind);
@@ -72,29 +75,28 @@
return owners;
}
-// public List<Address> locate(Object key, int replicationCount) {
-// int hash = Math.abs(key.hashCode());
-// int clusterSize = addresses.size();
-// int numCopiesToFind = Math.min(replicationCount, clusterSize);
-//
-// List<Address> results = new ArrayList<Address>(numCopiesToFind);
-//
-// int copyNumber = 0;
-//
-// while (results.size() < numCopiesToFind) {
-// // we mod the index the 2nd time to make sure the index starts again from the beginning when it reaches the end.
-// // e.g., in a cluster of 10 with 3 copies of data, and a key that maps to node index 9, the next 2 backups should
-// // be at indexes 0 and 1.
-//
-// int index = ((hash % clusterSize) + copyNumber) % clusterSize;
-// Address candidate = addresses.get(index);
-// results.add(candidate);
-// copyNumber++;
-// }
-//
-// return results;
-// }
+ public int getDistance(Address a1, Address a2) {
+ if (a1 == null || a2 == null) throw new NullPointerException("Cannot deal with nulls as parameters!");
+ int p1 = addresses.indexOf(a1);
+ if (p1 < 0)
+ throw new IllegalArgumentException("Address " + a1 + " not in the addresses list of this consistent hash impl!");
+
+ int p2 = addresses.indexOf(a2);
+ if (p2 < 0)
+ throw new IllegalArgumentException("Address " + a2 + " not in the addresses list of this consistent hash impl!");
+
+ if (p1 <= p2)
+ return p2 - p1;
+ else
+ return addresses.size() - (p1 - p2);
+ }
+
+ public boolean isAdjacent(Address a1, Address a2) {
+ int distance = getDistance(a1, a2);
+ return distance == 1 || distance == addresses.size() - 1;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -137,8 +139,4 @@
"addresses (in order of hash space position)=" + positions.values() +
'}';
}
-
- public boolean isInSameSubspace(Address a1, Address a2) {
- throw new UnsupportedOperationException("TODO Implement me!");
- }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -2,6 +2,7 @@
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.loaders.CacheStore;
@@ -101,5 +102,9 @@
* @return a cache store is one is available and configured for use in rehashing, or null otherwise.
*/
CacheStore getCacheStoreForRehashing();
+
+ boolean isRehashInProgress();
+
+ void applyReceivedState(Map<Object, InternalCacheValue> state);
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -3,12 +3,17 @@
import org.infinispan.CacheException;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
+import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
@@ -35,7 +40,9 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -53,7 +60,8 @@
private final Log log = LogFactory.getLog(DistributionManagerImpl.class);
private final boolean trace = log.isTraceEnabled();
Configuration configuration;
- ConsistentHash consistentHash;
+ volatile ConsistentHash consistentHash, oldConsistentHash;
+ Address self;
CacheLoaderManager cacheLoaderManager;
RpcManager rpcManager;
CacheManagerNotifier notifier;
@@ -82,8 +90,9 @@
private InvocationContextContainer icc;
private volatile boolean joinTaskSubmitted = false;
volatile boolean joinComplete = false;
+ final List<Address> leavers = new CopyOnWriteArrayList<Address>();
+ volatile Future<Void> leaveTaskFuture;
-
@Inject
public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
DataContainer dataContainer, InterceptorChain interceptorChain, InvocationContextContainer icc,
@@ -102,12 +111,12 @@
@Start(priority = 20)
public void start() throws Exception {
replCount = configuration.getNumOwners();
- consistentHash = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
- consistentHash.setCaches(rpcManager.getTransport().getMembers());
+ consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
+ self = rpcManager.getTransport().getAddress();
listener = new ViewChangeListener();
notifier.addListener(listener);
if (rpcManager.getTransport().getMembers().size() > 1) {
- JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, interceptorChain, icc, this);
+ JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
rehashExecutor.submit(joinTask);
} else {
joinComplete = true;
@@ -122,14 +131,21 @@
joinComplete = false;
}
- private Address diff(List<Address> newList, List<Address> oldList) {
- List<Address> list = new ArrayList<Address>(newList);
- list.removeAll(oldList);
- // Could easily be > 1 member joined!
- return list.size() > 0 ? list.get(0) : null;
+ final List<Address> diffAll(List<Address> l1, List<Address> l2) {
+ List<Address> largerList = l1.size() > l2.size() ? l1 : l2;
+ List<Address> smallerList = largerList == l1 ? l2 : l1;
+
+ List<Address> list = new ArrayList<Address>(largerList);
+ list.removeAll(smallerList);
+ return list;
}
+ final Address diff(List<Address> l1, List<Address> l2) {
+ List<Address> l = diffAll(l1, l2);
+ return l.isEmpty() ? null : l.get(0);
+ }
+
public void rehash(List<Address> newMembers, List<Address> oldMembers) {
boolean join = oldMembers == null || oldMembers.size() < newMembers.size();
// on view change, we should update our view
@@ -139,18 +155,51 @@
Address joiner = diff(newMembers, oldMembers);
log.info("This is a JOIN event! Wait for notification from new joiner " + joiner);
} else {
- log.info("This is a LEAVE event!");
- // TODO: implement this stuff!
- ConsistentHash newCH = new DefaultConsistentHash();
- newCH.setCaches(newMembers);
+ Address leaver = diff(newMembers, oldMembers);
+ log.info("This is a LEAVE event! Node {0} has just left", leaver);
- consistentHash = newCH;
+ boolean willReceiveLeaverState = willReceiveLeaverState(leaver);
+ boolean willSendLeaverState = willSendLeaverState(leaver);
+ try {
+ oldConsistentHash = consistentHash;
+ consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration);
+ } catch (Exception e) {
+ log.fatal("Unable to process leaver!!", e);
+ throw new CacheException(e);
+ }
+ if (willReceiveLeaverState) {
+ log.info("Starting transaction logging; expecting state from someone!");
+ transactionLogger.enable();
+ }
+
+ if (willSendLeaverState) {
+ if (leaveTaskFuture != null && (!leaveTaskFuture.isCancelled() || !leaveTaskFuture.isDone())) {
+ leaveTaskFuture.cancel(true);
+ }
+
+ leavers.add(leaver);
+ LeaveTask task = new LeaveTask(this, rpcManager, configuration, leavers, transactionLogger, cf, dataContainer);
+ leaveTaskFuture = rehashExecutor.submit(task);
+
+ log.info("Need to rehash");
+ } else {
+ log.info("Not in same subspace, so ignoring leave event");
+ }
}
}
+ boolean willSendLeaverState(Address leaver) {
+ return consistentHash.isAdjacent(leaver, self);
+ }
+
+ boolean willReceiveLeaverState(Address leaver) {
+ int dist = consistentHash.getDistance(leaver, self);
+ return dist <= replCount;
+ }
+
public boolean isLocal(Object key) {
- return consistentHash.locate(key, replCount).contains(rpcManager.getTransport().getAddress());
+ return consistentHash.locate(key, replCount).contains(self);
}
public List<Address> locate(Object key) {
@@ -195,8 +244,8 @@
}
public boolean isAffectedByRehash(Object key) {
- if (rehashInProgress) {
- throw new UnsupportedOperationException("TODO implement me");
+ if (transactionLogger.isEnabled() && oldConsistentHash != null && !oldConsistentHash.locate(key, replCount).contains(self)) {
+ return true;
} else {
return false;
}
@@ -248,6 +297,18 @@
log.trace("New CH is {0}", consistentHash);
}
+ public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
+ for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
+ if (consistentHash.locate(e.getKey(), configuration.getNumOwners()).contains(self)) {
+ InternalCacheValue v = e.getValue();
+ PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle());
+ InvocationContext ctx = icc.createInvocationContext();
+ ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP);
+ interceptorChain.invoke(ctx, put);
+ }
+ }
+ }
+
@Listener
public class ViewChangeListener {
@ViewChanged
@@ -262,4 +323,40 @@
return null;
return cacheLoaderManager.getCacheStore();
}
+
+ public boolean isRehashInProgress() {
+ return !leavers.isEmpty() || rehashInProgress;
+ }
+
+ public void applyReceivedState(Map<Object, InternalCacheValue> state) {
+ applyState(consistentHash, state);
+ boolean unlocked = false;
+ try {
+ drainTransactionLog();
+ unlocked = true;
+ } finally {
+ if (!unlocked) transactionLogger.unlockAndDisable();
+ }
+ }
+
+ void drainTransactionLog() {
+ List<WriteCommand> c;
+ while (transactionLogger.size() > 10) {
+ c = transactionLogger.drain();
+ apply(c);
+ }
+
+ c = transactionLogger.drainAndLock();
+ apply(c);
+
+ transactionLogger.unlockAndDisable();
+ }
+
+ private void apply(List<WriteCommand> c) {
+ for (WriteCommand cmd : c) {
+ InvocationContext ctx = icc.createInvocationContext();
+ ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP);
+ interceptorChain.invoke(ctx, cmd);
+ }
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -4,37 +4,23 @@
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.control.RehashControlCommand;
import static org.infinispan.commands.control.RehashControlCommand.Type.*;
-import org.infinispan.commands.write.InvalidateCommand;
-import org.infinispan.commands.write.PutKeyValueCommand;
-import org.infinispan.commands.write.WriteCommand;
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheValue;
-import org.infinispan.context.Flag;
-import org.infinispan.context.InvocationContext;
-import org.infinispan.context.InvocationContextContainer;
-import org.infinispan.interceptors.InterceptorChain;
+import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.NotifyingFutureImpl;
-import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Future;
/**
* 5. JoinTask: This is a PULL based rehash. JoinTask is kicked off on the JOINER. 5.1. Obtain OLD_CH from
@@ -54,23 +40,12 @@
private static final Log log = LogFactory.getLog(JoinTask.class);
ConsistentHash chOld;
ConsistentHash chNew;
- // ConsistentHash chTemp;
- CommandsFactory commandsFactory;
- TransactionLogger transactionLogger;
- DataContainer dataContainer;
- InterceptorChain interceptorChain;
- InvocationContextContainer icc;
Address self;
public JoinTask(RpcManager rpcManager, CommandsFactory commandsFactory, Configuration conf,
- TransactionLogger transactionLogger, DataContainer dataContainer, InterceptorChain interceptorChain,
- InvocationContextContainer icc, DistributionManagerImpl dmi) {
- super(dmi, rpcManager, conf);
- this.commandsFactory = commandsFactory;
- this.transactionLogger = transactionLogger;
+ TransactionLogger transactionLogger, DataContainer dataContainer, DistributionManagerImpl dmi) {
+ super(dmi, rpcManager, conf, transactionLogger, commandsFactory, dataContainer);
this.dataContainer = dataContainer;
- this.interceptorChain = interceptorChain;
- this.icc = icc;
self = rpcManager.getTransport().getAddress();
}
@@ -90,7 +65,8 @@
}
protected void performRehash() throws Exception {
- log.trace("Starting rehash on new joiner");
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) log.debug("Commencing");
boolean unlocked = false;
try {
dmi.joinComplete = false;
@@ -98,24 +74,24 @@
// this happens in a loop to ensure we receive the correct CH and not a "union".
// TODO make at least *some* of these configurable!
long minSleepTime = 500, maxSleepTime = 2000; // sleep time between retries
- int maxWaitTime = 600000; // after which we give up!
+ int maxWaitTime = (int) configuration.getRehashRpcTimeout() * 10; // after which we give up!
Random rand = new Random();
long giveupTime = System.currentTimeMillis() + maxWaitTime;
do {
- log.trace("Requesting old consistent hash from coordinator");
+ if (log.isTraceEnabled()) log.trace("Requesting old consistent hash from coordinator");
List<Response> resp = rpcManager.invokeRemotely(coordinator(),
- commandsFactory.buildRehashControlCommand(JOIN_REQ, self),
- SYNCHRONOUS, 100000, true);
+ cf.buildRehashControlCommand(JOIN_REQ, self),
+ SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
List<Address> addresses = parseResponses(resp);
- log.trace("Retrieved old consistent hash address list {0}", addresses);
+ if (log.isDebugEnabled()) log.debug("Retrieved old consistent hash address list {0}", addresses);
if (addresses == null) {
long time = rand.nextInt((int) (maxSleepTime - minSleepTime) / 10);
time = (time * 10) + minSleepTime;
- log.debug("Sleeping for {0}", Util.prettyPrintTime(time));
+ if (log.isTraceEnabled()) log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
Thread.sleep(time); // sleep for a while and retry
} else {
- chOld = createConsistentHash(addresses);
+ chOld = createConsistentHash(configuration, addresses);
}
} while (chOld == null && System.currentTimeMillis() < giveupTime);
@@ -123,109 +99,66 @@
throw new CacheException("Unable to retrieve old consistent hash from coordinator even after several attempts at sleeping and retrying!");
// 2. new CH instance
- chNew = createConsistentHash(chOld.getCaches(), self);
+ chNew = createConsistentHash(configuration, chOld.getCaches(), self);
dmi.setConsistentHash(chNew);
// 3. Enable TX logging
transactionLogger.enable();
// 4. Broadcast new temp CH
- rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+ rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
// 5. txLogger being enabled will cause CLusteredGetCommands to return uncertain responses.
// 6. pull state from everyone.
Address myAddress = rpcManager.getTransport().getAddress();
- RehashControlCommand cmd = commandsFactory.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
+ RehashControlCommand cmd = cf.buildRehashControlCommand(PULL_STATE, myAddress, null, chNew);
// TODO I should be able to process state chunks from different nodes simultaneously!!
- // TODO I should only send this pull state request to nodes which I know will send me state. Not everyone in chOld!!
- List<Response> resps = rpcManager.invokeRemotely(chOld.getCaches(), cmd, SYNCHRONOUS, 10000, true);
+ List<Address> addressesWhoMaySendStuff = getAddressesWhoMaySendStuff();
+ List<Response> resps = rpcManager.invokeRemotely(addressesWhoMaySendStuff, cmd, SYNCHRONOUS, configuration.getRehashRpcTimeout(), true);
// 7. Apply state
for (Response r : resps) {
if (r instanceof SuccessfulResponse) {
Map<Object, InternalCacheValue> state = getStateFromResponse((SuccessfulResponse) r);
- for (Map.Entry<Object, InternalCacheValue> e : state.entrySet()) {
- if (chNew.locate(e.getKey(), configuration.getNumOwners()).contains(myAddress)) {
- InternalCacheValue v = e.getValue();
- PutKeyValueCommand put = commandsFactory.buildPutKeyValueCommand(e.getKey(), v.getValue(), v.getLifespan(), v.getMaxIdle());
- InvocationContext ctx = icc.createInvocationContext();
- ctx.setFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_REMOTE_LOOKUP);
- interceptorChain.invoke(ctx, put);
- }
- }
+ dmi.applyState(chNew, state);
}
}
// 8. Drain logs
-
- List<WriteCommand> c = null;
- while (transactionLogger.size() > 10) {
- c = transactionLogger.drain();
- apply(c);
- }
-
- c = transactionLogger.drainAndLock();
- apply(c);
-
+ dmi.drainTransactionLog();
unlocked = true;
- // 9.
- transactionLogger.unlockAndDisable();
// 10.
- // TODO this phase should also "tell" the coord that the join is complete so that other waiting joiners
- // may proceed. Ideally another command, directed to the coord.
- rpcManager.broadcastRpcCommand(commandsFactory.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
- rpcManager.invokeRemotely(coordinator(), commandsFactory.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
- 100000, true);
+ rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_END, self), true, true);
+ rpcManager.invokeRemotely(coordinator(), cf.buildRehashControlCommand(JOIN_COMPLETE, self), SYNCHRONOUS,
+ configuration.getRehashRpcTimeout(), true);
// 11.
- Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
- for (Object key : dataContainer.keySet()) {
- Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
- for (Address a : invalidHolders) {
- Set<Object> s = invalidations.get(a);
- if (s == null) {
- s = new HashSet<Object>();
- invalidations.put(a, s);
- }
- s.add(key);
- }
- }
+ invalidateInvalidHolders(chOld, chNew);
- Set<Future> futures = new HashSet<Future>();
-
- for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
- InvalidateCommand ic = commandsFactory.buildInvalidateFromL1Command(e.getValue().toArray());
- NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
- rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
- futures.add(f);
- }
-
- for (Future f : futures) f.get();
+ if (log.isInfoEnabled())
+ log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
} catch (Exception e) {
- log.warn("Caught error performing rehash!", e);
+ log.error("Caught exception!", e);
} finally {
if (!unlocked) transactionLogger.unlockAndDisable();
dmi.joinComplete = true;
}
}
- private Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
- List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());
- List<Address> newOwners = chNew.locate(key, configuration.getNumOwners());
-
- List<Address> toInvalidate = new LinkedList<Address>(oldOwners);
- toInvalidate.removeAll(newOwners);
-
- return toInvalidate;
- }
-
- private void apply(List<WriteCommand> c) {
- for (WriteCommand cmd : c) {
- InvocationContext ctx = icc.createInvocationContext();
- ctx.setFlags(Flag.SKIP_REMOTE_LOOKUP);
- interceptorChain.invoke(ctx, cmd);
+ // TODO unit test this!!!
+ private List<Address> getAddressesWhoMaySendStuff() {
+ List<Address> caches = chNew.getCaches();
+ int selfIdx = caches.indexOf(self);
+ int replCount = configuration.getNumOwners();
+ if (selfIdx >= replCount - 1) {
+ return caches.subList(selfIdx - replCount + 1, selfIdx);
+ } else {
+ List<Address> l = new LinkedList<Address>(caches.subList(0, selfIdx));
+ int alreadyCollected = l.size();
+ l.addAll(caches.subList(caches.size() - replCount + 1 + alreadyCollected, caches.size()));
+ return l;
}
}
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -1,20 +1,165 @@
package org.infinispan.distribution;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+
/**
- * // TODO: Manik: Document this
+ * A task to handle rehashing for when a node leaves the cluster
*
* @author Manik Surtani
* @since 4.0
*/
public class LeaveTask extends RehashTask {
- protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
- super(dmi, rpcManager, configuration);
+ private static final Log log = LogFactory.getLog(LeaveTask.class);
+
+ private final List<Address> leavers;
+ private final Address self;
+ private final List<Address> leaversHandled;
+
+
+ protected LeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration, List<Address> leavers,
+ TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
+ super(dmi, rpcManager, configuration, transactionLogger, cf, dataContainer);
+ this.leavers = leavers;
+ this.leaversHandled = new LinkedList<Address>(leavers);
+ this.self = rpcManager.getTransport().getAddress();
}
protected void performRehash() throws Exception {
- // TODO: Customise this generated block
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) log.debug("Commencing. Leavers' list is {0}", leavers);
+ boolean completedSuccessfully = false;
+ List<Address> leaversHandled = new LinkedList<Address>(leavers);
+ ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+ int replCount = configuration.getNumOwners();
+ try {
+ StateMap statemap = new StateMap(leaversHandled, self, oldCH, dmi.getConsistentHash(), replCount);
+ if (log.isTraceEnabled()) log.trace("Examining state in data container");
+ // need to constantly detect whether we are interrupted. If so, abort accordingly.
+ for (InternalCacheEntry ice : dataContainer) {
+ List<Address> oldOwners = oldCH.locate(ice.getKey(), replCount);
+ for (Address a : oldOwners) if (leaversHandled.contains(a)) statemap.addState(ice);
+ }
+
+ CacheStore cs = dmi.getCacheStoreForRehashing();
+ if (cs != null) {
+ if (log.isTraceEnabled()) log.trace("Examining state in cache store");
+ for (InternalCacheEntry ice : cs.loadAll()) if (!statemap.containsKey(ice.getKey())) statemap.addState(ice);
+ }
+
+ // push state.
+ Set<Future<Void>> pushFutures = new HashSet<Future<Void>>();
+ for (Map.Entry<Address, Map<Object, InternalCacheValue>> entry : statemap.getState().entrySet()) {
+ if (log.isDebugEnabled()) log.debug("Pushing {0} entries to {1}", entry.getValue().size(), entry.getKey());
+ RehashControlCommand push = cf.buildRehashControlCommand(RehashControlCommand.Type.PUSH_STATE, self, entry.getValue());
+ NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+ pushFutures.add(f);
+ rpcManager.invokeRemotelyInFuture(Collections.singleton(entry.getKey()), push, true, f, configuration.getRehashRpcTimeout());
+ }
+
+ for (Future f : pushFutures) f.get();
+
+ completedSuccessfully = true;
+ invalidateInvalidHolders(oldCH, dmi.getConsistentHash());
+ if (log.isInfoEnabled())
+ log.info("Completed in {0}!", Util.prettyPrintTime(System.currentTimeMillis() - start));
+ } catch (InterruptedException ie) {
+ if (log.isInfoEnabled())
+ log.info("Interrupted after {0}! Completed successfully? {1}", Util.prettyPrintTime(System.currentTimeMillis() - start), completedSuccessfully);
+ } catch (Exception e) {
+ log.error("Caught exception! Completed successfully? {0}", e, completedSuccessfully);
+ }
+ finally {
+ if (completedSuccessfully) leavers.removeAll(leaversHandled);
+ }
}
+
+ @Override
+ protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
+ Collection<Address> l = super.getInvalidHolders(key, chOld, chNew);
+ l.removeAll(leaversHandled);
+ return l;
+ }
}
+
+class StateMap {
+ List<Address> leavers;
+ Address self;
+ ConsistentHash oldCH, newCH;
+ int replCount;
+ Set<Object> keysHandled = new HashSet<Object>();
+ Map<Address, Map<Object, InternalCacheValue>> state = new HashMap<Address, Map<Object, InternalCacheValue>>();
+
+ StateMap(List<Address> leavers, Address self, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
+ this.leavers = leavers;
+ this.self = self;
+ this.oldCH = oldCH;
+ this.newCH = newCH;
+ this.replCount = replCount;
+ }
+
+ /**
+ * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+ * owner list
+ *
+ * @param ice
+ */
+ void addState(InternalCacheEntry ice) {
+ for (Address leaver : leavers) {
+ List<Address> owners = oldCH.locate(ice.getKey(), replCount);
+ int leaverIndex = owners.indexOf(leaver);
+ if (leaverIndex > -1) {
+ int numOwners = owners.size();
+ int selfIndex = owners.indexOf(self);
+ boolean isLeaverLast = leaverIndex == numOwners - 1;
+ if ((isLeaverLast && selfIndex == numOwners - 2) ||
+ (!isLeaverLast && selfIndex == leaverIndex + 1)) {
+ // add to state map!
+ List<Address> newOwners = newCH.locate(ice.getKey(), replCount);
+ newOwners.removeAll(owners);
+ if (!newOwners.isEmpty()) {
+ for (Address no : newOwners) {
+ Map<Object, InternalCacheValue> s = state.get(no);
+ if (s == null) {
+ s = new HashMap<Object, InternalCacheValue>();
+ state.put(no, s);
+ }
+ s.put(ice.getKey(), ice.toInternalCacheValue());
+ }
+ }
+ }
+ }
+ }
+ keysHandled.add(ice.getKey());
+ }
+
+ Map<Address, Map<Object, InternalCacheValue>> getState() {
+ return state;
+ }
+
+ boolean containsKey(Object key) {
+ return keysHandled.contains(key);
+ }
+}
Deleted: trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/RecvLeaveTask.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.remoting.rpc.RpcManager;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class RecvLeaveTask extends LeaveTask {
- protected RecvLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
- super(dmi, rpcManager, configuration);
- }
-}
Modified: trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/RehashTask.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +1,25 @@
package org.infinispan.distribution;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.config.Configuration;
+import org.infinispan.container.DataContainer;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* // TODO: Manik: Document this
@@ -23,11 +32,18 @@
DistributionManagerImpl dmi;
RpcManager rpcManager;
Configuration configuration;
+ TransactionLogger transactionLogger;
+ CommandsFactory cf;
+ DataContainer dataContainer;
- protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
+ protected RehashTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration,
+ TransactionLogger transactionLogger, CommandsFactory cf, DataContainer dataContainer) {
this.dmi = dmi;
this.rpcManager = rpcManager;
this.configuration = configuration;
+ this.transactionLogger = transactionLogger;
+ this.cf = cf;
+ this.dataContainer = dataContainer;
}
public Void call() throws Exception {
@@ -46,15 +62,39 @@
return Collections.singleton(rpcManager.getTransport().getCoordinator());
}
- protected ConsistentHash createConsistentHash(Collection<Address> addresses) throws Exception {
- ConsistentHash ch = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
- ch.setCaches(addresses);
- return ch;
+ protected void invalidateInvalidHolders(ConsistentHash chOld, ConsistentHash chNew) throws ExecutionException, InterruptedException {
+ Map<Address, Set<Object>> invalidations = new HashMap<Address, Set<Object>>();
+ for (Object key : dataContainer.keySet()) {
+ Collection<Address> invalidHolders = getInvalidHolders(key, chOld, chNew);
+ for (Address a : invalidHolders) {
+ Set<Object> s = invalidations.get(a);
+ if (s == null) {
+ s = new HashSet<Object>();
+ invalidations.put(a, s);
+ }
+ s.add(key);
+ }
+ }
+
+ Set<Future> futures = new HashSet<Future>();
+
+ for (Map.Entry<Address, Set<Object>> e : invalidations.entrySet()) {
+ InvalidateCommand ic = cf.buildInvalidateFromL1Command(e.getValue().toArray());
+ NotifyingNotifiableFuture f = new NotifyingFutureImpl(null);
+ rpcManager.invokeRemotelyInFuture(Collections.singletonList(e.getKey()), ic, true, f);
+ futures.add(f);
+ }
+
+ for (Future f : futures) f.get();
}
- protected ConsistentHash createConsistentHash(Collection<Address> addresses, Address... moreAddresses) throws Exception {
- List<Address> list = new LinkedList<Address>(addresses);
- list.addAll(Arrays.asList(moreAddresses));
- return createConsistentHash(list);
+ protected Collection<Address> getInvalidHolders(Object key, ConsistentHash chOld, ConsistentHash chNew) {
+ List<Address> oldOwners = chOld.locate(key, configuration.getNumOwners());
+ List<Address> newOwners = chNew.locate(key, configuration.getNumOwners());
+
+ List<Address> toInvalidate = new LinkedList<Address>(oldOwners);
+ toInvalidate.removeAll(newOwners);
+
+ return toInvalidate;
}
}
Deleted: trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/SendLeaveTask.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -1,16 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.remoting.rpc.RpcManager;
-
-/**
- * // TODO: Manik: Document this
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class SendLeaveTask extends LeaveTask {
- protected SendLeaveTask(DistributionManagerImpl dmi, RpcManager rpcManager, Configuration configuration) {
- super(dmi, rpcManager, configuration);
- }
-}
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLogger.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -56,4 +56,6 @@
boolean logIfNeeded(Collection<WriteCommand> commands);
int size();
+
+ boolean isEnabled();
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/TransactionLoggerImpl.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -72,4 +72,8 @@
public int size() {
return enabled ? 0 : commandQueue.size();
}
+
+ public boolean isEnabled() {
+ return enabled;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -9,7 +9,6 @@
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
@@ -34,11 +33,11 @@
this.newCH = newCH;
}
- public void setCaches(Collection<Address> caches) {
+ public void setCaches(List<Address> caches) {
// no op
}
- public Collection<Address> getCaches() {
+ public List<Address> getCaches() {
return Collections.emptyList();
}
@@ -49,6 +48,14 @@
return Immutables.immutableListConvert(addresses);
}
+ public int getDistance(Address a1, Address a2) {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
+ public boolean isAdjacent(Address a1, Address a2) {
+ throw new UnsupportedOperationException("Unsupported!");
+ }
+
public ConsistentHash getNewConsistentHash() {
return newCH;
}
@@ -69,8 +76,4 @@
return new UnionConsistentHash((ConsistentHash) input.readObject(), (ConsistentHash) input.readObject());
}
}
-
- public boolean isInSameSubspace(Address a1, Address a2) {
- throw new UnsupportedOperationException("Not supported by this impl");
- }
}
Modified: trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/transaction/tm/DummyTransaction.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -74,7 +74,7 @@
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, SystemException {
try {
if (!notifyBeforeCompletion()) {
- log.trace("Not running 2PC as Synchronization.before not successfull");
+ log.trace("Not running 2PC as Synchronization.before not successful");
return;
}
@@ -310,7 +310,7 @@
try {
res.rollback(xid);
} catch (XAException e) {
- log.warn("Error while rolling back",e);
+ log.warn("Error while rolling back", e);
}
}
}
@@ -322,7 +322,7 @@
try {
res.commit(xid, false);//todo we only support one phase commit for now, change this!!!
} catch (XAException e) {
- log.warn("exception while committing",e);
+ log.warn("exception while committing", e);
throw new HeuristicMixedException(e.getMessage());
}
}
Modified: trunk/core/src/main/java/org/infinispan/util/Util.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/Util.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/main/java/org/infinispan/util/Util.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -55,7 +55,7 @@
}
@SuppressWarnings("unchecked")
- public static <T> T getInstance(Class<T> clazz) throws Exception {
+ public static <T> T getInstance(Class<T> clazz) throws IllegalAccessException, InstantiationException {
// first look for a getInstance() constructor
T instance;
try {
@@ -70,7 +70,7 @@
}
@SuppressWarnings("unchecked")
- public static Object getInstance(String classname) throws Exception {
+ public static Object getInstance(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if (classname == null) throw new IllegalArgumentException("Cannot load null class!");
Class clazz = loadClass(classname);
return getInstance(clazz);
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -290,7 +290,7 @@
* A special type of key that if passed a cache in its constructor, will ensure it will always be assigned to that
* cache (plus however many additional caches in the hash space)
*/
- protected static class MagicKey implements Serializable {
+ public static class MagicKey implements Serializable {
String name = null;
int hashcode;
String address;
Modified: trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -74,6 +74,27 @@
assert locations.get(k).size() == 3;
}
}
+
+ public void testDistances() {
+ Address a1 = new TestAddress(1);
+ Address a2 = new TestAddress(2);
+ Address a3 = new TestAddress(3);
+ Address a4 = new TestAddress(4);
+
+ ConsistentHash ch = new DefaultConsistentHash();
+ ch.setCaches(Arrays.asList(a1, a2, a3, a4));
+
+ assert ch.getDistance(a1, a1) == 0;
+ assert ch.getDistance(a1, a4) == 3;
+ assert ch.getDistance(a1, a3) == 2;
+ assert ch.getDistance(a3, a1) == 2;
+ assert ch.getDistance(a1, a2) == 1;
+ assert ch.getDistance(a2, a1) == 3;
+
+ assert ch.isAdjacent(a1, a2);
+ assert !ch.isAdjacent(a1, a3);
+ assert ch.isAdjacent(a1, a4);
+ }
}
class TestAddress implements Address {
Added: trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,46 @@
+package org.infinispan.distribution;
+
+import org.easymock.EasyMock;
+import org.infinispan.remoting.transport.Address;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+/**
+ * Tests helper functions on the DistManager
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(groups = "unit", testName = "distribution.DistributionManagerUnitTest")
+public class DistributionManagerUnitTest {
+ DistributionManagerImpl dmi = new DistributionManagerImpl();
+
+ public void testDeterminingLeaversAndJoiners() {
+ Address a1 = EasyMock.createNiceMock(Address.class);
+ Address a2 = EasyMock.createNiceMock(Address.class);
+ Address a3 = EasyMock.createNiceMock(Address.class);
+ Address a4 = EasyMock.createNiceMock(Address.class);
+ Address a5 = EasyMock.createNiceMock(Address.class);
+
+ Address newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4),
+ Arrays.asList(a1, a2, a3, a4, a5));
+
+ assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+ newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4),
+ Arrays.asList(a5, a4, a3, a2, a1));
+
+ assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+ newAddress = dmi.diff(Arrays.asList(a1, a2, a3, a4, a5),
+ Arrays.asList(a1, a2, a3, a4));
+
+ assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+
+ newAddress = dmi.diff(Arrays.asList(a5, a4, a3, a2, a1),
+ Arrays.asList(a1, a2, a3, a4));
+
+ assert newAddress == a5 : "Expecting " + a5 + " but was " + newAddress;
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/DistributionManagerUnitTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java 2009-08-20 14:11:12 UTC (rev 708)
+++ trunk/core/src/test/java/org/infinispan/distribution/RehashJoinTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -1,180 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.Cache;
-import org.infinispan.manager.CacheManager;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import static java.util.concurrent.TimeUnit.SECONDS;
-
- at Test(testName = "distribution.RehashJoinTest", groups = "functional")
-public class RehashJoinTest extends BaseDistFunctionalTest {
-
- Log log = LogFactory.getLog(RehashJoinTest.class);
- CacheManager joinerManager;
-
- public RehashJoinTest() {
- cleanup = CleanupPhase.AFTER_METHOD;
- }
-
- @Override
- protected void createCacheManagers() throws Throwable {
- super.createCacheManagers();
- joinerManager = addClusterEnabledCacheManager();
- joinerManager.defineConfiguration(cacheName, configuration);
- }
-
- public void testRehashOnJoin() {
- MagicKey k1 = new MagicKey(c1, "k1");
- MagicKey k2 = new MagicKey(c2, "k2");
- MagicKey k3 = new MagicKey(c3, "k3");
- MagicKey k4 = new MagicKey(c4, "k4");
-
- List<MagicKey> keys = new ArrayList<MagicKey>(4);
- keys.add(k1);
- keys.add(k2);
- keys.add(k3);
- keys.add(k4);
-
- int i = 0;
- for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
-
- i = 0;
- for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
- log.info("***>>> Firing up new joiner!");
-
- // now fire up a new joiner
- Cache<Object, String> joiner = joinerManager.getCache(cacheName);
-
- // need to wait for the joiner to, well, join.
- TestingUtil.blockUntilViewsReceived(SECONDS.toMillis(480), cacheManagers.toArray(new CacheManager[cacheManagers.size()]));
-
- // need to block until this join has completed!
- waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
-
- // where does the joiner sit in relation to the other caches?
- int joinerPos = locateJoiner(joinerManager.getAddress());
-
- log.info("***>>> Joiner is in position " + joinerPos);
-
- caches.add(joinerPos, joiner);
- i = 0;
- for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
- assertProperConsistentHashOnAllCaches();
- }
-
- public void testMultipleJoiners() throws InterruptedException {
-
- // have all JOIN phases completed?
- for (Cache c : caches) {
- DistributionManagerImpl dmi = (DistributionManagerImpl) getDistributionManager(c);
- assert !dmi.rehashInProgress : "Cache " + addressOf(c) + " still has rehashInProgress=true!";
- assert dmi.rehashQueue.isEmpty() : "Cache " + addressOf(c) + " still has queued RehashTasks!";
- assert !(dmi.getConsistentHash() instanceof UnionConsistentHash) : "Cache " + addressOf(c) + " still using a UnionConsistentHash!";
- }
-
- MagicKey k1 = new MagicKey(c1, "k1");
- MagicKey k2 = new MagicKey(c2, "k2");
- MagicKey k3 = new MagicKey(c3, "k3");
- MagicKey k4 = new MagicKey(c4, "k4");
-
- List<MagicKey> keys = new ArrayList<MagicKey>(4);
- keys.add(k1);
- keys.add(k2);
- keys.add(k3);
- keys.add(k4);
-
- int i = 0;
- for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
-
- i = 0;
- for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
- int numNewJoiners = 3; // in addition to the 1 joiner defined in createCacheManagers()!
- int numJoiners = numNewJoiners + 1;
- final CacheManager[] joinerManagers = new CacheManager[numJoiners];
- final Cache[] joiners = new Cache[numJoiners];
- joinerManagers[0] = joinerManager;
-
- for (i = 1; i < numJoiners; i++) {
- joinerManagers[i] = addClusterEnabledCacheManager();
- joinerManagers[i].defineConfiguration(cacheName, configuration);
- }
-
- log.info("***>>> Firing up {0} new joiners!", numJoiners);
-
- // now fire up a new joiners, in separate threads.
- final CountDownLatch joinLatch = new CountDownLatch(1);
- Thread[] threads = new Thread[numJoiners];
-
- for (i = 0; i < numJoiners; i++) {
- final int idx = i;
- threads[idx] = new Thread() {
- public void run() {
- try {
- joinLatch.await();
- } catch (InterruptedException e) {
- log.error(e);
- }
- joiners[idx] = joinerManagers[idx].getCache(cacheName);
- }
- };
- threads[idx].start();
- }
-
- joinLatch.countDown();
-
- for (Thread t : threads) t.join();
-
- CacheManager[] cacheManagerArray = cacheManagers.toArray(new CacheManager[cacheManagers.size()]);
- log.info("Number of cache manager views to wait for: {0}", cacheManagerArray.length);
-
- // need to wait for the joiner to, well, join.
- TestingUtil.blockUntilViewsReceived(SECONDS.toMillis(480), cacheManagerArray);
- // where do the joiners sit in relation to the other caches?
-
-
- waitForJoinTasksToComplete(SECONDS.toMillis(480), joiners);
-
- // need to wait a *short while* before we attempt to locate joiners, since post-join invalidation messages are sent async.
- // TODO replace this with some form of command detection on remote nodes.
- // join tasks happen sequentially as well, so this needs some time to finish
- TestingUtil.sleepThread(SECONDS.toMillis(2));
-
- int[] joinersPos = new int[numJoiners];
- for (i = 0; i < numJoiners; i++) joinersPos[i] = locateJoiner(joinerManagers[i].getAddress());
-
- log.info("***>>> Joiners are in positions " + Arrays.toString(joinersPos));
- for (i = 0; i < numJoiners; i++) {
- if (joinersPos[i] > caches.size())
- caches.add(joiners[i]);
- else
- caches.add(joinersPos[i], joiners[i]);
- }
- i = 0;
- for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
-
- assertProperConsistentHashOnAllCaches();
- }
-
- private int locateJoiner(Address joinerAddress) {
- for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
- DefaultConsistentHash dch = getDefaultConsistentHash(c, SECONDS.toMillis(480));
- int i = 0;
- for (Address a : dch.positions.values()) {
- if (a.equals(joinerAddress)) return i;
- i++;
- }
- }
- throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
- }
-}
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinAndCoordDeathTest", enabled = false)
+public class ConcurrentJoinAndCoordDeathTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndCoordDeathTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinAndOtherSenderDeathTest", enabled = false)
+public class ConcurrentJoinAndOtherSenderDeathTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinAndOtherSenderDeathTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,48 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentJoinTest")
+public class ConcurrentJoinTest extends RehashTestBase {
+
+ List<CacheManager> joinerManagers;
+ List<Cache<Object, String>> joiners;
+
+ final int numJoiners = 4;
+
+ void performRehashEvent() {
+ joinerManagers = new ArrayList<CacheManager>(numJoiners);
+ joiners = new ArrayList<Cache<Object, String>>(numJoiners);
+ for (int i = 0; i < numJoiners; i++) {
+ CacheManager joinerManager = addClusterEnabledCacheManager();
+ joinerManager.defineConfiguration(cacheName, configuration);
+ Cache<Object, String> joiner = joinerManager.getCache(cacheName);
+ joinerManagers.add(joinerManager);
+ joiners.add(joiner);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void waitForRehashCompletion() {
+ waitForJoinTasksToComplete(SECONDS.toMillis(480), joiners.toArray(new Cache[numJoiners]));
+ TestingUtil.sleepThread(SECONDS.toMillis(2));
+ int[] joinersPos = new int[numJoiners];
+ for (int i = 0; i < numJoiners; i++) joinersPos[i] = locateJoiner(joinerManagers.get(i).getAddress());
+
+ log.info("***>>> Joiners are in positions " + Arrays.toString(joinersPos));
+ for (int i = 0; i < numJoiners; i++) {
+ if (joinersPos[i] > caches.size())
+ caches.add(joiners.get(i));
+ else
+ caches.add(joinersPos[i], joiners.get(i));
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentJoinTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentLeaveAndJoinNonOverlappingTest", enabled = false)
+public class ConcurrentLeaveAndJoinNonOverlappingTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
\ No newline at end of file
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinNonOverlappingTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentLeaveAndJoinOverlappingTest", enabled = false)
+public class ConcurrentLeaveAndJoinOverlappingTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentLeaveAndJoinOverlappingTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,26 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentNonOverlappingLeaveTest")
+public class ConcurrentNonOverlappingLeaveTest extends RehashLeaveTestBase {
+ Address l1, l2;
+
+ void performRehashEvent() {
+ l1 = addressOf(c2);
+ l2 = addressOf(c4);
+
+ CacheManager cm2 = c2.getCacheManager();
+ CacheManager cm4 = c4.getCacheManager();
+
+ cacheManagers.removeAll(Arrays.asList(cm2, cm4));
+ caches.removeAll(Arrays.asList(c2, c4));
+
+ TestingUtil.killCacheManagers(cm2, cm4);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentNonOverlappingLeaveTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,26 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+
+ at Test(groups = "functional", testName = "distribution.rehash.ConcurrentOverlappingLeaveTest")
+public class ConcurrentOverlappingLeaveTest extends RehashLeaveTestBase {
+ Address l1, l2;
+
+ void performRehashEvent() {
+ l1 = addressOf(c3);
+ l2 = addressOf(c4);
+
+ CacheManager cm3 = c3.getCacheManager();
+ CacheManager cm4 = c4.getCacheManager();
+
+ cacheManagers.removeAll(Arrays.asList(cm3, cm4));
+ caches.removeAll(Arrays.asList(c3, c4));
+
+ TestingUtil.killCacheManagers(cm3, cm4);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/ConcurrentOverlappingLeaveTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.JoinAndCoordDeathTest", enabled = false)
+public class JoinAndCoordDeathTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndCoordDeathTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,10 @@
+package org.infinispan.distribution.rehash;
+
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.JoinAndOtherSenderDeathTest", enabled = false)
+public class JoinAndOtherSenderDeathTest {
+ public void implementMe() {
+ assert false : "Implement me!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/JoinAndOtherSenderDeathTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,31 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+ at Test(groups = "functional", testName = "distribution.rehash.RehashLeaveTestBase")
+public abstract class RehashLeaveTestBase extends RehashTestBase {
+ void waitForRehashCompletion() {
+ long giveupTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60 * 5);
+
+
+ for (Cache c : caches) {
+ DistributionManager distributionManager = getDistributionManager(c);
+
+ while (distributionManager.isRehashInProgress() && System.currentTimeMillis() < giveupTime) {
+ TestingUtil.sleepThread(250);
+ }
+
+ if (distributionManager.isRehashInProgress())
+ throw new RuntimeException("Timed out waiting for rehash to complete on cache " + addressOf(c));
+ }
+
+ // ensure transaction logging is disabled everywhere
+ for (Cache c : caches)
+ assert !getDistributionManager(c).getTransactionLogger().isEnabled() : "Transaction logging for cache " + addressOf(c) + " is still enabled!!";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashLeaveTestBase.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,224 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.DefaultConsistentHash;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * A base test for all rehashing tests
+ */
+ at Test(groups = "functional", testName = "distribution.rehash.RehashTestBase")
+public abstract class RehashTestBase extends BaseDistFunctionalTest {
+
+ protected RehashTestBase() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ tx = true;
+ }
+
+ // this setup has 4 running caches: {c1, c2, c3, c4}
+
+ /**
+ * This is overridden by subclasses. Could typically be a JOIN or LEAVE event.
+ */
+ abstract void performRehashEvent();
+
+ /**
+ * Blocks until a rehash completes.
+ */
+ abstract void waitForRehashCompletion();
+
+ protected int locateJoiner(Address joinerAddress) {
+ for (Cache c : Arrays.asList(c1, c2, c3, c4)) {
+ DefaultConsistentHash dch = getDefaultConsistentHash(c, SECONDS.toMillis(480));
+ int i = 0;
+ for (Address a : dch.getCaches()) {
+ if (a.equals(joinerAddress)) return i;
+ i++;
+ }
+ }
+ throw new RuntimeException("Cannot locate joiner! Joiner is [" + joinerAddress + "]");
+ }
+
+
+ private List<MagicKey> init() {
+ List<MagicKey> keys = new ArrayList<MagicKey>(Arrays.asList(
+ new MagicKey(c1, "k1"), new MagicKey(c2, "k2"),
+ new MagicKey(c3, "k3"), new MagicKey(c4, "k4")
+ ));
+
+ int i = 0;
+ for (Cache<Object, String> c : caches) c.put(keys.get(i++), "v" + i);
+
+ i = 0;
+ for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+ log.info("Initialized with keys {0}", keys);
+ return keys;
+ }
+
+ /**
+ * Simple test. Put some state, trigger event, test results
+ */
+ public void testNonTransactional() {
+ List<MagicKey> keys = init();
+
+ log.info("Invoking rehash event");
+ performRehashEvent();
+
+ waitForRehashCompletion();
+ log.info("Rehash complete");
+
+ int i = 0;
+ for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + ++i);
+
+ assertProperConsistentHashOnAllCaches();
+ }
+
+
+ /**
+ * More complex - init some state. Start a new transaction, and midway trigger a rehash. Then complete transaction
+ * and test results.
+ */
+ @Test(enabled = false, description = "Enable after releasing Beta1")
+ public void testTransactional() throws Exception {
+ final List<MagicKey> keys = init();
+ final CountDownLatch l = new CountDownLatch(1);
+
+ Thread th = new Thread("Updater") {
+ @Override
+ public void run() {
+ try {
+ // start a transaction on c1.
+ TransactionManager t1 = TestingUtil.getTransactionManager(c1);
+ t1.begin();
+ c1.put(keys.get(0), "transactionally_replaced");
+ Transaction tx = t1.getTransaction();
+ tx.enlistResource(new XAResourceAdapter() {
+ public int prepare(Xid id) {
+ // this would be called *after* the cache prepares.
+ try {
+ l.await();
+ } catch (InterruptedException e) {
+ }
+ return XAResource.XA_OK;
+ }
+ });
+ t1.commit();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ th.start();
+
+ log.info("Invoking rehash event");
+ performRehashEvent();
+ l.countDown();
+ th.join();
+
+ log.info("Rehash complete");
+
+ assertOnAllCachesAndOwnership(keys.get(0), "transactionally_replaced");
+ assertOnAllCachesAndOwnership(keys.get(1), "v" + 2);
+ assertOnAllCachesAndOwnership(keys.get(2), "v" + 3);
+ assertOnAllCachesAndOwnership(keys.get(3), "v" + 4);
+
+ assertProperConsistentHashOnAllCaches();
+ }
+
+ /**
+ * A stress test. One node is constantly modified while a rehash occurs.
+ */
+ @Test(enabled = false, description = "Enable after releasing Beta1")
+ public void testNonTransactionalStress() throws Exception {
+ stressTest(false);
+ }
+
+ /**
+ * A stress test. One node is constantly modified using transactions while a rehash occurs.
+ */
+ @Test(enabled = false, description = "Enable after releasing Beta1")
+ public void testTransactionalStress() throws Exception {
+ stressTest(true);
+ }
+
+ private void stressTest(boolean tx) throws Exception {
+ final List<MagicKey> keys = init();
+ final CountDownLatch latch = new CountDownLatch(1);
+ List<Updater> updaters = new ArrayList<Updater>(keys.size());
+ for (MagicKey k : keys) {
+ Updater u = new Updater(c1, k, latch, tx);
+ u.start();
+ updaters.add(u);
+ }
+
+ latch.countDown();
+
+ log.info("Invoking rehash event");
+ performRehashEvent();
+
+ for (Updater u : updaters) u.complete();
+ for (Updater u : updaters) u.join();
+
+ waitForRehashCompletion();
+
+ log.info("Rehash complete");
+
+ int i = 0;
+ for (MagicKey key : keys) assertOnAllCachesAndOwnership(key, "v" + updaters.get(i++).currentValue);
+
+ assertProperConsistentHashOnAllCaches();
+ }
+}
+
+class Updater extends Thread {
+ static final Random r = new Random();
+ volatile int currentValue = 0;
+ BaseDistFunctionalTest.MagicKey key;
+ Cache cache;
+ CountDownLatch latch;
+ volatile boolean running = true;
+ TransactionManager tm;
+
+ Updater(Cache cache, BaseDistFunctionalTest.MagicKey key, CountDownLatch latch, boolean tx) {
+ super("Updater-" + key);
+ this.key = key;
+ this.cache = cache;
+ this.latch = latch;
+ if (tx) tm = TestingUtil.getTransactionManager(cache);
+ }
+
+ public void complete() {
+ running = false;
+ }
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ currentValue++;
+ if (tm != null) tm.begin();
+ cache.put(key, "v" + currentValue);
+ if (tm != null) tm.commit();
+ TestingUtil.sleepThread(r.nextInt(10) * 10);
+ } catch (Exception e) {
+ // do nothing?
+ }
+ }
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashTestBase.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,31 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.Cache;
+import org.infinispan.manager.CacheManager;
+import org.testng.annotations.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+ at Test(groups = "functional", testName = "distribution.rehash.SingleJoinTest")
+public class SingleJoinTest extends RehashTestBase {
+ CacheManager joinerManager;
+ Cache<Object, String> joiner;
+
+ void performRehashEvent() {
+ joinerManager = addClusterEnabledCacheManager();
+ joinerManager.defineConfiguration(cacheName, configuration);
+ joiner = joinerManager.getCache(cacheName);
+ }
+
+ void waitForRehashCompletion() {
+ // need to block until this join has completed!
+ waitForJoinTasksToComplete(SECONDS.toMillis(480), joiner);
+
+ // where does the joiner sit in relation to the other caches?
+ int joinerPos = locateJoiner(joinerManager.getAddress());
+
+ log.info("***>>> Joiner is in position " + joinerPos);
+
+ caches.add(joinerPos, joiner);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleJoinTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,20 @@
+package org.infinispan.distribution.rehash;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "distribution.rehash.SingleLeaveTest")
+public class SingleLeaveTest extends RehashLeaveTestBase {
+ Address leaverAddress;
+
+ void performRehashEvent() {
+ // cause a node to LEAVE. Typically this is c4.
+ leaverAddress = addressOf(c4);
+ CacheManager cm4 = c4.getCacheManager();
+ cacheManagers.remove(cm4);
+ caches.remove(c4);
+ TestingUtil.killCacheManagers(cm4);
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/SingleLeaveTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java 2009-08-20 22:27:13 UTC (rev 709)
@@ -0,0 +1,53 @@
+package org.infinispan.distribution.rehash;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+/**
+ * abstract class that needs to be overridden
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public abstract class XAResourceAdapter implements XAResource {
+ public void commit(Xid xid, boolean b) throws XAException {
+ // no-op
+ }
+
+ public void end(Xid xid, int i) throws XAException {
+ // no-op
+ }
+
+ public void forget(Xid xid) throws XAException {
+ // no-op
+ }
+
+ public int getTransactionTimeout() throws XAException {
+ return 0;
+ }
+
+ public boolean isSameRM(XAResource xaResource) throws XAException {
+ return false;
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ return 0;
+ }
+
+ public Xid[] recover(int i) throws XAException {
+ return new Xid[0];
+ }
+
+ public void rollback(Xid xid) throws XAException {
+ // no-op
+ }
+
+ public boolean setTransactionTimeout(int i) throws XAException {
+ return false;
+ }
+
+ public void start(Xid xid, int i) throws XAException {
+ // no-op
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/distribution/rehash/XAResourceAdapter.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list