[infinispan-commits] Infinispan SVN: r207 - in trunk/core/src: main/java/org/infinispan/distribution and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sat May 2 06:18:28 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-02 06:18:28 -0400 (Sat, 02 May 2009)
New Revision: 207

Modified:
   trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
   trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
Log:
More efficient and scalable consistent hash algo - performance no longer proportional to log(cluster size).  New algo also revealed some shortcomings in the tests that were corrected.

Modified: trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java	2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java	2009-05-02 10:18:28 UTC (rev 207)
@@ -10,19 +10,19 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A container that maintains order of entries based on when they were placed in the container.  Iterators obtained
- * from this container maintain this order.
- * <p />
+ * A container that maintains order of entries based on when they were placed in the container.  Iterators obtained from
+ * this container maintain this order.
+ * <p/>
  * This container offers constant-time operation for all public API methods.
- * <p />
- * This is implemented using a set of lockable segments, each of which is a hash table, not unlike the JDK's
- * {@link java.util.concurrent.ConcurrentHashMap} with the exception that each entry is also linked.
- * <p />
- * Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper,
- * <a href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly Linked Lists</i></a>,
- * M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>,
- * and Java6's ConcurrentSkipListMap.
- * <p />
+ * <p/>
+ * This is implemented using a set of lockable segments, each of which is a hash table, not unlike the JDK's {@link
+ * java.util.concurrent.ConcurrentHashMap} with the exception that each entry is also linked.
+ * <p/>
+ * Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper, <a
+ * href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly
+ * Linked Lists</i></a>, M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High
+ * Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>, and Java6's ConcurrentSkipListMap.
+ * <p/>
  *
  * @author Manik Surtani
  * @since 4.0
@@ -87,18 +87,13 @@
 
    // links and link management
 
-   static final class LinkedEntry {
+   static class LinkedEntry {
       volatile InternalCacheEntry e;
       volatile LinkedEntry n, p;
 
-      private static final AtomicReferenceFieldUpdater<LinkedEntry, InternalCacheEntry> E_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, InternalCacheEntry.class, "e");
       private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> N_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "n");
       private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> P_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "p");
 
-      final boolean casValue(InternalCacheEntry expected, InternalCacheEntry newValue) {
-         return E_UPDATER.compareAndSet(this, expected, newValue);
-      }
-
       final boolean casNext(LinkedEntry expected, LinkedEntry newValue) {
          return N_UPDATER.compareAndSet(this, expected, newValue);
       }
@@ -114,8 +109,24 @@
       final boolean isMarked() {
          return e == null; // an impossible value unless deleted
       }
+
+      final LinkedEntry getN() {
+         return n;
+      }
+
+      final LinkedEntry getP() {
+         return p;
+      }
    }
 
+   static final class Marker extends LinkedEntry {
+      Marker(LinkedEntry actual) {
+         e = null;
+         n = actual;
+         p = actual;
+      }
+   }
+
    /**
     * Initializes links to an empty container
     */
@@ -127,8 +138,7 @@
    }
 
    protected final void unlink(LinkedEntry le) {
-      le.p.casNext(le, le.n);
-      le.n.casPrev(le, le.p);
+      if (le.p.casNext(le, le.n)) le.n.casPrev(le, le.p);
    }
 
    protected final void linkAtEnd(LinkedEntry le) {
@@ -440,7 +450,7 @@
             current = n;
             if (n == head || n == tail) throw new IndexOutOfBoundsException("Reached head or tail pointer!");
          }
-         
+
          return current.e;
       }
    }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2009-05-02 10:18:28 UTC (rev 207)
@@ -1,60 +1,45 @@
 package org.infinispan.distribution;
 
 import org.infinispan.remoting.transport.Address;
-import org.infinispan.util.Immutables;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 
 public class DefaultConsistentHash implements ConsistentHash {
-   private SortedMap<Integer, Address> caches = new TreeMap<Integer, Address>();
-   // must be > max number of nodes in a cluster.  Assume no more than a million nodes in a cluster?  :-)
-   final static int HASH_SPACE = 1000000;
+   // make sure all threads see the current list
+   volatile ArrayList<Address> addresses;
 
-
    public void setCaches(Collection<Address> caches) {
-      this.caches.clear();
-      // evenly distribute the caches across this space.
-      int increaseFactor = HASH_SPACE / caches.size();
-      int nextIndex = increaseFactor;
-      for (Address a : caches) {
-         this.caches.put(nextIndex, a);
-         nextIndex += increaseFactor;
-      }
+      addresses = new ArrayList<Address>(caches);
+
+      // this list won't grow.
+      addresses.trimToSize();
    }
 
    public List<Address> locate(Object key, int replicationCount) {
       int hash = Math.abs(key.hashCode());
-      int index = hash % HASH_SPACE;
+      int clusterSize = addresses.size();
+      int numCopiesToFind = Math.min(replicationCount, clusterSize);
 
-      Set<Address> results = new LinkedHashSet<Address>();
+      List<Address> results = new ArrayList<Address>(numCopiesToFind);
 
-      SortedMap<Integer, Address> tailmap = caches.tailMap(index);
-      int count = 0;
+      int copyNumber = 0;
 
-      for (Map.Entry<Integer, Address> entry : tailmap.entrySet()) {
-         Address val = entry.getValue();
-         results.add(val);
-         if (++count >= replicationCount)
-            break;
-      }
+      while (results.size() < numCopiesToFind) {
+         // we mod the index the 2nd time to make sure the index starts again from the beginning when it reaches the end.
+         // e.g., in a cluster of 10 with 3 copies of data, and a key that maps to node index 9, the next 2 backups should
+         // be at indexes 0 and 1.
 
-      if (count < replicationCount) {
-         for (Map.Entry<Integer, Address> entry : caches.entrySet()) {
-            Address val = entry.getValue();
-            results.add(val);
-            if (++count >= replicationCount)
-               break;
-         }
+         int index = ((hash % clusterSize) + copyNumber) % clusterSize;
+         Address candidate = addresses.get(index);
+         results.add(candidate);
+         copyNumber++;
       }
 
-      return Immutables.immutableListConvert(results);
+      return results;
    }
 
    public Map<Object, List<Address>> locateAll(Collection<Object> keys, int replCount) {

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2009-05-02 10:18:28 UTC (rev 207)
@@ -51,12 +51,16 @@
       for (Cache<Object, String> c : caches) assert c.isEmpty();
 
       c1.put("k1", "value");
-      asyncWait("k1", PutKeyValueCommand.class, getNonOwners("k1"));
+      asyncWait("k1", PutKeyValueCommand.class, getNonOwnersExcludingSelf("k1", addressOf(c1)));
       for (Cache<Object, String> c : caches)
-         assert "value".equals(c.get("k1")) : "Failed on cache " + c.getCacheManager().getAddress();
+         assert "value".equals(c.get("k1")) : "Failed on cache " + addressOf(c);
       assertOwnershipAndNonOwnership("k1");
    }
 
+   protected static Address addressOf(Cache<?, ?> cache) {
+      return cache.getCacheManager().getAddress();
+   }
+
    protected Cache<Object, String> getFirstNonOwner(String key) {
       return getNonOwners(key)[0];
    }
@@ -70,10 +74,10 @@
          Object realVal = c.get(key);
          if (value == null) {
             assert realVal == null : "Expecting [" + key + "] to equal [" + value + "] on cache ["
-                  + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+                  + addressOf(c) + "] but was [" + realVal + "]";
          } else {
             assert value.equals(realVal) : "Expecting [" + key + "] to equal [" + value + "] on cache ["
-                  + c.getCacheManager().getAddress() + "] but was [" + realVal + "]";
+                  + addressOf(c) + "] but was [" + realVal + "]";
          }
       }
       if (value != null) assertOwnershipAndNonOwnership(key);
@@ -84,11 +88,11 @@
          DataContainer dc = c.getAdvancedCache().getDataContainer();
          if (isOwner(c, key)) {
             InternalCacheEntry ice = dc.get(key);
-            assert ice != null : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned null!";
-            assert ice instanceof ImmortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+            assert ice != null : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned null!";
+            assert ice instanceof ImmortalCacheEntry : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned " + dc.get(key);
          } else {
             if (dc.containsKey(key)) {
-               assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + c.getCacheManager().getAddress() + ": dc.get(" + key + ") returned " + dc.get(key);
+               assert dc.get(key) instanceof MortalCacheEntry : "Fail on cache " + addressOf(c) + ": dc.get(" + key + ") returned " + dc.get(key);
                assert dc.get(key).getLifespan() == c1.getConfiguration().getL1Lifespan();
             }
          }
@@ -98,28 +102,28 @@
    protected void assertIsInL1(Cache<?, ?> cache, Object key) {
       DataContainer dc = cache.getAdvancedCache().getDataContainer();
       InternalCacheEntry ice = dc.get(key);
-      assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
-      assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + cache.getCacheManager().getAddress() + "]!";
+      assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+      assert !(ice instanceof ImmortalCacheEntry) : "Entry for key [" + key + "] should have a lifespan on cache at [" + addressOf(cache) + "]!";
    }
 
    protected void assertIsNotInL1(Cache<?, ?> cache, Object key) {
       DataContainer dc = cache.getAdvancedCache().getDataContainer();
       InternalCacheEntry ice = dc.get(key);
-      assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
+      assert ice == null : "Entry for key [" + key + "] should not be in data container on cache at [" + addressOf(cache) + "]!";
    }
 
    protected void assertIsInContainerImmortal(Cache<?, ?> cache, Object key) {
       DataContainer dc = cache.getAdvancedCache().getDataContainer();
       InternalCacheEntry ice = dc.get(key);
-      assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + cache.getCacheManager().getAddress() + "]!";
-      assert ice instanceof ImmortalCacheEntry : "Entry for key [" + key + "] on cache at [" + cache.getCacheManager().getAddress() + "] should be immortal but was [" + ice + "]!";
+      assert ice != null : "Entry for key [" + key + "] should be in data container on cache at [" + addressOf(cache) + "]!";
+      assert ice instanceof ImmortalCacheEntry : "Entry for key [" + key + "] on cache at [" + addressOf(cache) + "] should be immortal but was [" + ice + "]!";
    }
 
    protected static boolean isOwner(Cache<?, ?> c, Object key) {
       DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
       List<Address> ownerAddresses = dm.locate(key);
       for (Address a : ownerAddresses) {
-         if (c.getCacheManager().getAddress().equals(a)) return true;
+         if (addressOf(c).equals(a)) return true;
       }
       return false;
    }
@@ -127,7 +131,7 @@
    protected static boolean isFirstOwner(Cache<?, ?> c, Object key) {
       DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
       List<Address> ownerAddresses = dm.locate(key);
-      return c.getCacheManager().getAddress().equals(ownerAddresses.get(0));
+      return addressOf(c).equals(ownerAddresses.get(0));
    }
 
    protected Cache<Object, String>[] getOwners(Object key) {
@@ -139,6 +143,28 @@
       return owners;
    }
 
+   protected Cache<Object, String>[] getNonOwnersExcludingSelf(Object key, Address self) {
+      Cache<Object, String>[] nonOwners = getNonOwners(key);
+      boolean selfInArray = false;
+      for (Cache<?, ?> c : nonOwners) {
+         if (addressOf(c).equals(self)) {
+            selfInArray = true;
+            break;
+         }
+      }
+
+      if (selfInArray) {
+         Cache<Object, String>[] nonOwnersExclSelf = new Cache[nonOwners.length - 1];
+         int i = 0;
+         for (Cache<Object, String> c : nonOwners) {
+            if (!addressOf(c).equals(self)) nonOwnersExclSelf[i++] = c;
+         }
+         return nonOwnersExclSelf;
+      } else {
+         return nonOwners;
+      }
+   }
+
    protected Cache<Object, String>[] getNonOwners(Object key) {
       Cache<Object, String>[] nonOwners = new Cache[2];
       int i = 0;
@@ -182,9 +208,8 @@
       String address;
 
       public MagicKey(Cache<?, ?> toMapTo) {
-         address = toMapTo.getCacheManager().getAddress().toString();
-         // generate a hashcode that will always map it to the specified cache.
-         for (int i = 1; i < DefaultConsistentHash.HASH_SPACE; i += 100) {
+         address = addressOf(toMapTo).toString();
+         for (int i = 0; i < toMapTo.getCacheManager().getMembers().size(); i++) {
             // create a dummy object with this hashcode
             final int hc = i;
             Object dummy = new Object() {

Modified: trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2009-05-01 21:31:32 UTC (rev 206)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2009-05-02 10:18:28 UTC (rev 207)
@@ -48,7 +48,6 @@
    public void testPutFromNonOwner() {
       initAndTest();
       Cache<Object, String> nonOwner = getFirstNonOwner("k1");
-      System.out.println("Non-owner address is " + nonOwner.getCacheManager().getAddress());
 
       Object retval = nonOwner.put("k1", "value2");
       asyncWait("k1", PutKeyValueCommand.class, getSecondNonOwner("k1"));




More information about the infinispan-commits mailing list