[infinispan-commits] Infinispan SVN: r207 - in trunk/core/src: main/java/org/infinispan/distribution and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sat May 2 06:18:28 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-02 06:18:28 -0400 (Sat, 02 May 2009)
New Revision: 207
Modified:
trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
Log:
More efficient and scalable consistent hash algo - performance no longer proportional to log(cluster size). New algo also revealed some shortcomings in the tests that were corrected.
Modified: trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-05-02 10:18:28 UTC (rev 207)
@@ -10,19 +10,19 @@
import java.util.concurrent.locks.ReentrantLock;
/**
- * A container that maintains order of entries based on when they were placed in the container. Iterators obtained
- * from this container maintain this order.
- * <p />
+ * A container that maintains order of entries based on when they were placed in the container. Iterators obtained from
+ * this container maintain this order.
+ * <p/>
* This container offers constant-time operation for all public API methods.
- * <p />
- * This is implemented using a set of lockable segments, each of which is a hash table, not unlike the JDK's
- * {@link java.util.concurrent.ConcurrentHashMap} with the exception that each entry is also linked.
- * <p />
- * Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper,
- * <a href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly Linked Lists</i></a>,
- * M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>,
- * and Java6's ConcurrentSkipListMap.
- * <p />
+ * <p/>
+ * This is implemented using a set of lockable segments, each of which is a hash table, not unlike the JDK's {@link
+ * java.util.concurrent.ConcurrentHashMap} with the exception that each entry is also linked.
+ * <p/>
+ * Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper, <a
+ * href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly
+ * Linked Lists</i></a>, M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High
+ * Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>, and Java6's ConcurrentSkipListMap.
+ * <p/>
*
* @author Manik Surtani
* @since 4.0
@@ -87,18 +87,13 @@
// links and link management
- static final class LinkedEntry {
+ static class LinkedEntry {
volatile InternalCacheEntry e;
volatile LinkedEntry n, p;
- private static final AtomicReferenceFieldUpdater<LinkedEntry, InternalCacheEntry> E_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, InternalCacheEntry.class, "e");
private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> N_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "n");
private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> P_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "p");
- final boolean casValue(InternalCacheEntry expected, InternalCacheEntry newValue) {
- return E_UPDATER.compareAndSet(this, expected, newValue);
- }
-
final boolean casNext(LinkedEntry expected, LinkedEntry newValue) {
return N_UPDATER.compareAndSet(this, expected, newValue);
}
@@ -114,8 +109,24 @@
final boolean isMarked() {
return e == null; // an impossible value unless deleted
}
+
+ final LinkedEntry getN() {
+ return n;
+ }
+
+ final LinkedEntry getP() {
+ return p;
+ }
}
+ static final class Marker extends LinkedEntry {
+ Marker(LinkedEntry actual) {
+ e = null;
+ n = actual;
+ p = actual;
+ }
+ }
+
/**
* Initializes links to an empty container
*/
@@ -127,8 +138,7 @@
}
protected final void unlink(LinkedEntry le) {
- le.p.casNext(le, le.n);
- le.n.casPrev(le, le.p);
+ if (le.p.casNext(le, le.n)) le.n.casPrev(le, le.p);
}
protected final void linkAtEnd(LinkedEntry le) {
@@ -440,7 +450,7 @@
current = n;
if (n == head || n == tail) throw new IndexOutOfBoundsException("Reached head or tail pointer!");
}
-
+
return current.e;
}
}
Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java 2009-05-02 10:18:28 UTC (rev 207)
@@ -1,60 +1,45 @@
package org.infinispan.distribution;
import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.Immutables;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
public class DefaultConsistentHash implements ConsistentHash {
- private SortedMap<Integer, Address> caches = new TreeMap<Integer, Address>();
- // must be > max number of nodes in a cluster. Assume no more than a million nodes in a cluster? :-)
- final static int HASH_SPACE = 1000000;
+ // make sure all threads see the current list
+ volatile ArrayList<Address> addresses;
-
public void setCaches(Collection<Address> caches) {
- this.caches.clear();
- // evenly distribute the caches across this space.
- int increaseFactor = HASH_SPACE / caches.size();
- int nextIndex = increaseFactor;
- for (Address a : caches) {
- this.caches.put(nextIndex, a);
- nextIndex += increaseFactor;
- }
+ addresses = new ArrayList<Address>(caches);
+
+ // this list won't grow.
+ addresses.trimToSize();
}
public List<Address> locate(Object key, int replicationCount) {
int hash = Math.abs(key.hashCode());
- int index = hash % HASH_SPACE;
+ int clusterSize = addresses.size();
+ int numCopiesToFind = Math.min(replicationCount, clusterSize);
- Set<Address> results = new LinkedHashSet<Address>();
+ List<Address> results = new ArrayList<Address>(numCopiesToFind);
- SortedMap<Integer, Address> tailmap = caches.tailMap(index);
- int count = 0;
+ int copyNumber = 0;
- for (Map.Entry<Integer, Address> entry : tailmap.entrySet()) {
- Address val = entry.getValue();
- results.add(val);
- if (++count >= replicationCount)
- break;
- }
+ while (results.size() < numCopiesToFind) {
+ // we mod the index the 2nd time to make sure the index starts again from the beginning when it reaches the end.
+ // e.g., in a cluster of 10 with 3 copies of data, and a key that maps to node index 9, the next 2 backups should
+ // be at indexes 0 and 1.
- if (count < replicationCount) {
- for (Map.Entry<Integer, Address> entry : caches.entrySet()) {
- Address val = entry.getValue();
- results.add(val);
- if (++count >= replicationCount)
- break;
- }
+ int index = ((hash % clusterSize) + copyNumber) % clusterSize;
+ Address candidate = addresses.get(index);
+ results.add(candidate);
+ copyNumber++;
}
- return Immutables.immutableListConvert(results);
+ return results;
}
public Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount) {
Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java 2009-05-02 10:18:28 UTC (rev 207)
@@ -51,12 +51,16 @@
for (Cache<Object, String> c : caches) assert c.isEmpty();
c1.put("k1", "value");
- asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
+ asyncWait("k1", PutKeyValueCommand.class, getNonOwnersExcludingSelf("k1", addressOf(c1)));
for (Cache<Object, String> c : caches)
- assert "value".equals(c.get("k1")) : "Failed on cache " + c.getCacheManager().getAddress();
+ assert "value".equals(c.get("k1")) : "Failed on cache " + addressOf(c);
assertOwnershipAndNonOwnership("k1");
}
+ protected static Address addressOf(Cache<?, ?> cache) {
+ return cache.getCacheManager().getAddress();
+ }
+
protected Cache<Object, String> getFirstNonOwner(String key) {
return getNonOwners(key)[0];
}
@@ -70,10 +74,10 @@
Object realVal = c.get(key);
if (value == null) {
assert realVal == null : "Expecting [" + key + "] to equal [" + value + "] on cache ["
- + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+ + addressOf(c) + "] but was [" + realVal + "]";
} else {
assert value.equals(realVal) : "Expecting [" + key + "] to equal [" + value + "] on cache ["
- + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+ + addressOf(c) + "] but was [" + realVal + "]";
}
}
if (value != null) assertOwnershipAndNonOwnership(key);
@@ -84,11 +88,11 @@
DataContainer dc = c.getAdvancedCache().getDataContainer();
if (isOwner(c, key)) {
InternalCacheEntry ice = dc.get(key);
- assert ice != null : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned null!";
- assert ice instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+ assert ice != null : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned null!";
+ assert ice instanceof ImmortalCacheEntry : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned " + dc.get(key);
} else {
if (dc.containsKey(key)) {
- assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+ assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned " + dc.get(key);
assert dc.get(key).getLifespan() == c1.getConfiguration().getL1Lifespan();
}
}
@@ -98,28 +102,28 @@
protected void assertIsInL1(Cache<?, ?> cache, Object key) {
DataContainer dc = cache.getAdvancedCache().getDataContainer();
InternalCacheEntry ice = dc.get(key);
- assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
- assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + cache.getCacheManager().getAddress() + "]!";
+ assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+ assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + addressOf(cache) + "]!";
}
protected void assertIsNotInL1(Cache<?, ?> cache, Object key) {
DataContainer dc = cache.getAdvancedCache().getDataContainer();
InternalCacheEntry ice = dc.get(key);
- assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
+ assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + addressOf(cache) + "]!";
}
protected void assertIsInContainerImmortal(Cache<?, ?> cache, Object key) {
DataContainer dc = cache.getAdvancedCache().getDataContainer();
InternalCacheEntry ice = dc.get(key);
- assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
- assert ice instanceof ImmortalCacheEntry : "Entry for key [" + key + "] on cache at [" + cache.getCacheManager().getAddress() + "] should be immortal but was [" + ice + "]!";
+ assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+ assert ice instanceof ImmortalCacheEntry : "Entry for key [" + key + "] on cache at [" + addressOf(cache) + "] should be immortal but was [" + ice + "]!";
}
protected static boolean isOwner(Cache<?, ?> c, Object key) {
DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
List<Address> ownerAddresses = dm.locate(key);
for (Address a : ownerAddresses) {
- if (c.getCacheManager().getAddress().equals(a)) return true;
+ if (addressOf(c).equals(a)) return true;
}
return false;
}
@@ -127,7 +131,7 @@
protected static boolean isFirstOwner(Cache<?, ?> c, Object key) {
DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
List<Address> ownerAddresses = dm.locate(key);
- return c.getCacheManager().getAddress().equals(ownerAddresses.get(0));
+ return addressOf(c).equals(ownerAddresses.get(0));
}
protected Cache<Object, String>[] getOwners(Object key) {
@@ -139,6 +143,28 @@
return owners;
}
+ protected Cache<Object, String>[] getNonOwnersExcludingSelf(Object key, Address self) {
+ Cache<Object, String>[] nonOwners = getNonOwners(key);
+ boolean selfInArray = false;
+ for (Cache<?, ?> c : nonOwners) {
+ if (addressOf(c).equals(self)) {
+ selfInArray = true;
+ break;
+ }
+ }
+
+ if (selfInArray) {
+ Cache<Object, String>[] nonOwnersExclSelf = new Cache[nonOwners.length - 1];
+ int i = 0;
+ for (Cache<Object, String> c : nonOwners) {
+ if (!addressOf(c).equals(self)) nonOwnersExclSelf[i++] = c;
+ }
+ return nonOwnersExclSelf;
+ } else {
+ return nonOwners;
+ }
+ }
+
protected Cache<Object, String>[] getNonOwners(Object key) {
Cache<Object, String>[] nonOwners = new Cache[2];
int i = 0;
@@ -182,9 +208,8 @@
String address;
public MagicKey(Cache<?, ?> toMapTo) {
- address = toMapTo.getCacheManager().getAddress().toString();
- // generate a hashcode that will always map it to the specified cache.
- for (int i = 1; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
+ address = addressOf(toMapTo).toString();
+ for (int i = 0; i < toMapTo.getCacheManager().getMembers().size(); i++) {
// create a dummy object with this hashcode
final int hc = i;
Object dummy = new Object() {
Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java 2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java 2009-05-02 10:18:28 UTC (rev 207)
@@ -48,7 +48,6 @@
public void testPutFromNonOwner() {
initAndTest();
Cache<Object, String> nonOwner = getFirstNonOwner("k1");
- System.out.println("Non-owner address is " + nonOwner.getCacheManager().getAddress());
Object retval = nonOwner.put("k1", "value2");
asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));
More information about the infinispan-commits
mailing list