[infinispan-commits] Infinispan SVN: r1225 - in trunk/core/src: test/java/org/infinispan/stress and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Nov 27 07:56:46 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-11-27 07:56:45 -0500 (Fri, 27 Nov 2009)
New Revision: 1225
Added:
trunk/core/src/main/java/org/infinispan/container/FIFOAMRDataContainer.java
trunk/core/src/main/java/org/infinispan/container/LRUAMRDataContainer.java
trunk/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java
Modified:
trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
Log:
[ISPN-277] ( LRU data container endlesly looping or exhibiting heavy contention)
Added: trunk/core/src/main/java/org/infinispan/container/FIFOAMRDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFOAMRDataContainer.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/container/FIFOAMRDataContainer.java 2009-11-27 12:56:45 UTC (rev 1225)
@@ -0,0 +1,874 @@
+package org.infinispan.container;
+
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalEntryFactory;
+import org.infinispan.util.Immutables;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.AbstractCollection;
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
+
+import net.jcip.annotations.ThreadSafe;
+
+/**
+ * A container that maintains order of entries based on when they were placed in the container. Iterators obtained from
+ * this container maintain this order.
+ * <p/>
+ * This container offers constant-time operation for all public API methods.
+ * <p/>
+ * This is implemented using a set of lockable segments, each of which is a hash table, not unlike the JDK's {@link
+ * java.util.concurrent.ConcurrentHashMap} with the exception that each entry is also linked.
+ * <p/>
+ * Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper, <a
+ * href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly
+ * Linked Lists</i></a>, M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High
+ * Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>.
+ * <p />
+ * This implementation uses JDK {@link java.util.concurrent.atomic.AtomicMarkableReference}
+ * to implement reference deletion markers.
+ * <p/>
+ *
+ * @author Manik Surtani
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at ThreadSafe
+public class FIFOAMRDataContainer implements DataContainer {
+
+ /*
+ This implementation closely follows the pseudocode in Sundell and Tsigas' paper (Referred to as STP) for managing
+ the lock-free, threadsafe doubly linked list. AtomicMarkedReferences are used to implement the pointers referred
+ to in the paper.
+ */
+
+ /**
+ * The maximum capacity, used if a higher value is implicitly specified by either of the constructors with arguments.
+ * MUST be a power of two <= 1<<30 to ensure that entries are indexable using ints.
+ */
+ static final int MAXIMUM_CAPACITY = 1 << 30;
+
+ // -- these fields are all very similar to JDK's ConcurrentHashMap
+
+ /**
+ * Mask value for indexing into segments. The upper bits of a key's hash code are used to choose the segment.
+ */
+ final int segmentMask;
+
+ /**
+ * Shift value for indexing within segments.
+ */
+ final int segmentShift;
+
+ /**
+ * The segments, each of which is a specialized hash table
+ */
+ final Segment[] segments;
+
+ Set<Object> keySet;
+
+ final LinkedEntry head = new LinkedEntry(null), tail = new LinkedEntry(null);
+
+ public FIFOAMRDataContainer(int concurrencyLevel) {
+ float loadFactor = 0.75f;
+ int initialCapacity = 256;
+
+ // Find power-of-two sizes best matching arguments
+ int sshift = 0;
+ int ssize = 1;
+ while (ssize < concurrencyLevel) {
+ ++sshift;
+ ssize <<= 1;
+ }
+ segmentShift = 32 - sshift;
+ segmentMask = ssize - 1;
+ this.segments = Segment.newArray(ssize);
+
+ if (initialCapacity > MAXIMUM_CAPACITY)
+ initialCapacity = MAXIMUM_CAPACITY;
+ int c = initialCapacity / ssize;
+ if (c * ssize < initialCapacity)
+ ++c;
+ int cap = 1;
+ while (cap < c)
+ cap <<= 1;
+
+ for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment(cap, loadFactor);
+ initLinks();
+ }
+
+ // links and link management
+
+ /**
+ * Back off
+ *
+ * @param nanos nanos to back off for. If -1, starts at a default
+ * @return next time, back off for these nanos
+ */
+ Random r = new Random();
+ private static final long backoffStart = 10000;
+
+ private long backoff(long nanos) {
+// long actualNanos = nanos < 0 ? backoffStart : nanos;
+// LockSupport.parkNanos(actualNanos);
+// long newNanos = actualNanos << 1;
+// return newNanos > 10000000 ? backoffStart : newNanos;
+ int millis = (1 + r.nextInt(9)) * 10;
+ LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(millis));
+ return -1;
+ }
+
+ /**
+ * Tests whether a given linked entry is marked for deletion. In this implementation, being "marked" means that it
+ * is of type Marker rather than LinkedEntry, but given the relative cost of an "instanceof" check, we prefer to test
+ * the state of the InternalCacheEntry referenced by the LinkedEntry. An InternalCacheEntry *always* exists so if it
+ * is null, then this is a marker (or possibly the head or tail dummy entry).
+ *
+ * @param e entry to test
+ * @return true if the entry is marked for removal. False if it is not, or if the entry is the head or tail dummy
+ * entry.
+ */
+// protected final boolean isMarkedForRemoval(LinkedEntry e) {
+// return e != head && e != tail && e.e == null;
+// }
+
+ /**
+ * Places a removal marker the 'previous' reference on the given entry. Note that marking a reference does not mean
+ * that the reference pointed to is marked for removal, rather it means the LinkedEntry doing the referencing is the
+ * entry to be removed.
+ *
+ * @param e entry
+ * @return true if the marking was successful, false otherwise. Could return false if the reference is already
+ * marked, or if the CAS failed.
+ */
+// protected final boolean markPrevReference(LinkedEntry e) {
+// return !e.p.isMarked() && e.p.attemptMark(e.p.getReference(), true);
+// }
+
+ /**
+ * Places a removal marker the 'next' reference on the given entry. Note that marking a reference does not mean that
+ * the reference pointed to is marked for removal, rather it means the LinkedEntry doing the referencing is the entry
+ * to be removed.
+ *
+ * @param e entry
+ * @return true if the marking was successful, false otherwise. Could return false if the reference is already
+ * marked, or if the CAS failed.
+ */
+// protected final boolean markNextReference(LinkedEntry e) {
+// return !e.n.isMarked() && e.n.attemptMark(e.n.getReference(), true);
+// }
+
+ /**
+ * The LinkedEntry class. This entry is stored in the lockable Segments, and is also capable of being doubly
+ * linked.
+ */
+ static class LinkedEntry {
+ volatile InternalCacheEntry e;
+ /**
+ * Links to next and previous entries. Needs to be volatile.
+ */
+// volatile LinkedEntry n, p;
+ AtomicMarkableReference<LinkedEntry> n = new AtomicMarkableReference<LinkedEntry>(null, false),
+ p = new AtomicMarkableReference<LinkedEntry>(null, false);
+
+ /**
+ * CAS updaters for prev and next references
+ */
+// private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> N_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "n");
+// private static final AtomicReferenceFieldUpdater<LinkedEntry, LinkedEntry> P_UPDATER = AtomicReferenceFieldUpdater.newUpdater(LinkedEntry.class, LinkedEntry.class, "p");
+
+ /**
+ * LinkedEntries must always have a valid InternalCacheEntry.
+ *
+ * @param e internal cache entry
+ */
+ LinkedEntry(InternalCacheEntry e) {
+ this.e = e;
+ }
+
+// final boolean casNext(LinkedEntry expected, LinkedEntry newValue) {
+// return n.compareAndSet(expected, newValue, false, false);
+// }
+//
+// final boolean casPrev(LinkedEntry expected, LinkedEntry newValue) {
+// return p.compareAndSet(expected, newValue, false, false);
+// }
+//
+// @Override
+// public String toString() {
+// return "E" + Integer.toHexString(System.identityHashCode(this));
+// }
+ }
+
+ /**
+ * A marker. If a reference in LinkedEntry (either to its previous or next entry) needs to be marked, it should be
+ * CAS'd with an instance of Marker that points to the actual entry. Typically this is done by calling {@link
+ * FIFOAMRDataContainer#markNextReference(FIFOAMRDataContainer.LinkedEntry)} or {@link
+ * FIFOAMRDataContainer#markPrevReference(FIFOAMRDataContainer.LinkedEntry)}
+ */
+// static final class Marker extends LinkedEntry {
+// Marker(LinkedEntry actual) {
+// super(null);
+// n = actual;
+// p = actual;
+// }
+//
+// @Override
+// public String toString() {
+// return "M" + Integer.toHexString(System.identityHashCode(this));
+// }
+// }
+
+ /**
+ * Initializes links to an empty container
+ */
+ protected final void initLinks() {
+ head.n.set(tail, false);
+ head.p.set(tail, false);
+ tail.n.set(head, false);
+ tail.p.set(head, false);
+ }
+
+ /**
+ * Un-links an entry from the doubly linked list in a threadsafe, lock-free manner. The entry is typically retrieved
+ * using Segment#locklessRemove() after locking the Segment.
+ *
+ * @param node entry to unlink
+ */
+ // This corresponds to the Delete() function in STP
+ protected final void unlink(LinkedEntry node) {
+ if (node == head || node == tail) return;
+ while (true) {
+ AtomicMarkableReference<LinkedEntry> next = node.n;
+ if (next.isMarked()) return;
+ if (node.n.compareAndSet(next.getReference(), next.getReference(), false, true)) {
+ AtomicMarkableReference<LinkedEntry> prev;
+ while (true) {
+ prev = node.p;
+ if (prev.isMarked() || node.p.compareAndSet(prev.getReference(), prev.getReference(), false, true)) {
+ break;
+ }
+ }
+ correctPrev(prev.getReference().p.getReference(), next.getReference());
+ }
+ }
+ }
+
+ /**
+ * Links a new entry at the end of the linked list. Typically done when a put() creates a new entry, or if ordering
+ * needs to be updated based on access. If this entry already exists in the linked list, it should first be {@link
+ * #unlink(FIFOAMRDataContainer.LinkedEntry)}ed.
+ *
+ * @param node entry to link at end
+ */
+ // Corresponds to PushRight() in STP
+ protected final void linkAtEnd(LinkedEntry node) {
+ LinkedEntry next = tail;
+ LinkedEntry prev = next.p.getReference();
+ long backoffTime = -1;
+ while (true) {
+ node.p.set(prev, false);
+ node.n.set(next, false);
+ if (prev.n.compareAndSet(next, node, false, false)) break;
+ prev = correctPrev(prev, next);
+ backoffTime = backoff(backoffTime);
+ }
+
+ // PushEnd()
+ backoffTime = -1;
+ while (true) {
+ AtomicMarkableReference<LinkedEntry> l1 = next.p;
+ if (l1.isMarked() || (node.n.isMarked() || node.n.getReference() != next)) break;
+ if (next.p.compareAndSet(l1.getReference(), node, false, false)) {
+ if (node.p.isMarked()) correctPrev(node, next);
+ break;
+ }
+ backoffTime = backoff(backoffTime);
+ }
+ }
+
+ /**
+ * Retrieves the next entry after a given entry, skipping marked entries accordingly.
+ *
+ * @param current current entry to inspect
+ * @return the next valid entry, or null if we have reached the end of the list.
+ */
+ // Corresponds to the Next() function in STP pom
+ protected final LinkedEntry getNext(LinkedEntry current) {
+ while (true) {
+ if (current == tail) return null;
+ AtomicMarkableReference<LinkedEntry> next = current.n;
+ boolean d = next.getReference().n.isMarked();
+ if (d && (!current.n.isMarked() || current.n.getReference() != next.getReference())) {
+ // set mark next.p
+ next.getReference().p.attemptMark(next.getReference().p.getReference(), true);
+ current.n.compareAndSet(next.getReference(), next.getReference().n.getReference(), false, false);
+ continue;
+ }
+
+ current = next.getReference();
+ if (!d && next.getReference() != tail) return current;
+ }
+ }
+
+ /**
+ * Correct 'previous' links. This 'helper' function is used if unable to properly set previous pointers (due to a
+ * concurrent update) and is used when traversing the list in reverse.
+ *
+ * @param prev suggested previous entry
+ * @param node current entry
+ * @return the actual valid, previous entry. Links are also corrected in the process.
+ */
+ // Corresponds to CorrectPrev() in STP
+ protected final LinkedEntry correctPrev(LinkedEntry prev, LinkedEntry node) {
+ LinkedEntry lastLink = null;
+ AtomicMarkableReference<LinkedEntry> link1, prev2;
+ long backoffTime = -1;
+
+ // holders to atomically retrieve ref + mark
+ boolean[] markHolder = new boolean[1];
+ LinkedEntry referenceHolder;
+
+ while (true) {
+ link1 = node.p;
+ if (link1.isMarked()) break;
+
+ prev2 = prev.n;
+ if (prev2.isMarked()) {
+ if (lastLink != null) {
+ AtomicMarkableReference<LinkedEntry> prevP = prev.p;
+ while (!prevP.attemptMark(prevP.getReference(), true)) {}
+ lastLink.n.compareAndSet(prev, prev2.getReference(), lastLink.n.isMarked(), false);
+ prev = lastLink;
+ lastLink = null;
+ continue;
+ }
+ prev2 = prev.p;
+ prev = prev2.getReference();
+ continue;
+ }
+
+ if (prev2.getReference() != node) {
+ lastLink = prev;
+ prev = prev2.getReference();
+ continue;
+ }
+
+ referenceHolder = link1.get(markHolder);
+ if (node.p.compareAndSet(referenceHolder, prev, markHolder[0], false)) {
+ if (prev.p.isMarked()) continue;
+ break;
+ }
+ backoffTime = backoff(backoffTime);
+ }
+ return prev;
+ }
+
+// private LinkedEntry unmarkPrevIfNeeded(LinkedEntry e) {
+// if (isMarkedForRemoval(e)) return e.p;
+// else return e;
+// }
+
+
+ /**
+ * Similar to ConcurrentHashMap's hash() function: applies a supplemental hash function to a given hashCode, which
+ * defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length
+ * hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.
+ */
+ final int hashOld(int h) {
+ // Spread bits to regularize both segment and index locations,
+ // using variant of single-word Wang/Jenkins hash.
+ h += (h << 15) ^ 0xffffcd7d;
+ h ^= (h >>> 10);
+ h += (h << 3);
+ h ^= (h >>> 6);
+ h += (h << 2) + (h << 14);
+ return h ^ (h >>> 16);
+ }
+
+ /**
+ * Use the objects built in hash to obtain an initial value, then use a second four byte hash to obtain a more
+ * uniform distribution of hash values. This uses a <a href = "http://burtleburtle.net/bob/hash/integer.html">4-byte
+ * (integer) hash</a>, which produces well distributed values even when the original hash produces thghtly clustered
+ * values.
+ * <p/>
+ * Contributed by akluge <a href-="http://www.vizitsolutions.com/ConsistentHashingCaching.html">http://www.vizitsolutions.com/ConsistentHashingCaching.html</a>
+ */
+ final int hash(int hash) {
+ hash = (hash + 0x7ED55D16) + (hash << 12);
+ hash = (hash ^ 0xc761c23c) ^ (hash >> 19);
+ hash = (hash + 0x165667b1) + (hash << 5);
+ hash = (hash + 0xd3a2646c) ^ (hash << 9);
+ hash = (hash + 0xfd7046c5) + (hash << 3);
+ hash = (hash ^ 0xb55a4f09) ^ (hash >> 16);
+
+ return hash;
+ }
+
+ /**
+ * Returns the segment that should be used for key with given hash
+ *
+ * @param hash the hash code for the key
+ * @return the segment
+ */
+ final Segment segmentFor(int hash) {
+ return segments[(hash >>> segmentShift) & segmentMask];
+ }
+
+ /**
+ * ConcurrentHashMap list entry. Note that this is never exported out as a user-visible Map.Entry.
+ * <p/>
+ * Because the value field is volatile, not final, it is legal wrt the Java Memory Model for an unsynchronized reader
+ * to see null instead of initial value when read via a data race. Although a reordering leading to this is not
+ * likely to ever actually occur, the Segment.readValueUnderLock method is used as a backup in case a null
+ * (pre-initialized) value is ever seen in an unsynchronized access method.
+ */
+ static final class HashEntry {
+ final Object key;
+ final int hash;
+ volatile LinkedEntry value;
+ final HashEntry next;
+
+ HashEntry(Object key, int hash, HashEntry next, LinkedEntry value) {
+ this.key = key;
+ this.hash = hash;
+ this.next = next;
+ this.value = value;
+ }
+ }
+
+ /**
+ * Very similar to a Segment in a ConcurrentHashMap
+ */
+ static final class Segment extends ReentrantLock {
+ /**
+ * The number of elements in this segment's region.
+ */
+ transient volatile int count;
+
+ /**
+ * The table is rehashed when its size exceeds this threshold. (The value of this field is always
+ * <tt>(int)(capacity * loadFactor)</tt>.)
+ */
+ transient int threshold;
+
+ /**
+ * The per-segment table.
+ */
+ transient volatile HashEntry[] table;
+
+ /**
+ * The load factor for the hash table. Even though this value is same for all segments, it is replicated to avoid
+ * needing links to outer object.
+ *
+ * @serial
+ */
+ final float loadFactor;
+
+ Segment(int initialCapacity, float lf) {
+ loadFactor = lf;
+ setTable(new HashEntry[initialCapacity]);
+ }
+
+ static final Segment[] newArray(int i) {
+ return new Segment[i];
+ }
+
+ /**
+ * Sets table to new HashEntry array. Call only while holding lock or in constructor.
+ */
+ final void setTable(HashEntry[] newTable) {
+ threshold = (int) (newTable.length * loadFactor);
+ table = newTable;
+ }
+
+ /**
+ * Returns properly casted first entry of bin for given hash.
+ */
+ final HashEntry getFirst(int hash) {
+ HashEntry[] tab = table;
+ return tab[hash & (tab.length - 1)];
+ }
+
+ /**
+ * Reads value field of an entry under lock. Called if value field ever appears to be null. This is possible only
+ * if a compiler happens to reorder a HashEntry initialization with its table assignment, which is legal under
+ * memory model but is not known to ever occur.
+ */
+ final LinkedEntry readValueUnderLock(HashEntry e) {
+ lock();
+ try {
+ return e.value;
+ } finally {
+ unlock();
+ }
+ }
+
+ /* Specialized implementations of map methods */
+
+ final LinkedEntry get(Object key, int hash) {
+ if (count != 0) { // read-volatile
+ HashEntry e = getFirst(hash);
+ while (e != null) {
+ if (e.hash == hash && key.equals(e.key)) {
+ LinkedEntry v = e.value;
+ if (v != null)
+ return v;
+ return readValueUnderLock(e); // recheck
+ }
+ e = e.next;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * This put is lockless. Make sure you call segment.lock() first.
+ */
+ final LinkedEntry locklessPut(Object key, int hash, LinkedEntry value) {
+ int c = count;
+ if (c++ > threshold) // ensure capacity
+ rehash();
+ HashEntry[] tab = table;
+ int index = hash & (tab.length - 1);
+ HashEntry first = tab[index];
+ HashEntry e = first;
+ while (e != null && (e.hash != hash || !key.equals(e.key)))
+ e = e.next;
+
+ LinkedEntry oldValue;
+ if (e != null) {
+ oldValue = e.value;
+ e.value = value;
+ } else {
+ oldValue = null;
+ tab[index] = new HashEntry(key, hash, first, value);
+ count = c; // write-volatile
+ }
+ return oldValue;
+ }
+
+ final void rehash() {
+ HashEntry[] oldTable = table;
+ int oldCapacity = oldTable.length;
+ if (oldCapacity >= MAXIMUM_CAPACITY)
+ return;
+
+ /*
+ * Reclassify nodes in each list to new Map. Because we are
+ * using power-of-two expansion, the elements from each bin
+ * must either stay at same index, or move with a power of two
+ * offset. We eliminate unnecessary node creation by catching
+ * cases where old nodes can be reused because their next
+ * fields won't change. Statistically, at the default
+ * threshold, only about one-sixth of them need cloning when
+ * a table doubles. The nodes they replace will be garbage
+ * collectable as soon as they are no longer referenced by any
+ * reader thread that may be in the midst of traversing table
+ * right now.
+ */
+
+ HashEntry[] newTable = new HashEntry[oldCapacity << 1];
+ threshold = (int) (newTable.length * loadFactor);
+ int sizeMask = newTable.length - 1;
+ for (int i = 0; i < oldCapacity; i++) {
+ // We need to guarantee that any existing reads of old Map can
+ // proceed. So we cannot yet null out each bin.
+ HashEntry e = oldTable[i];
+
+ if (e != null) {
+ HashEntry next = e.next;
+ int idx = e.hash & sizeMask;
+
+ // Single node on list
+ if (next == null)
+ newTable[idx] = e;
+
+ else {
+ // Reuse trailing consecutive sequence at same slot
+ HashEntry lastRun = e;
+ int lastIdx = idx;
+ for (HashEntry last = next;
+ last != null;
+ last = last.next) {
+ int k = last.hash & sizeMask;
+ if (k != lastIdx) {
+ lastIdx = k;
+ lastRun = last;
+ }
+ }
+ newTable[lastIdx] = lastRun;
+
+ // Clone all remaining nodes
+ for (HashEntry p = e; p != lastRun; p = p.next) {
+ int k = p.hash & sizeMask;
+ HashEntry n = newTable[k];
+ newTable[k] = new HashEntry(p.key, p.hash, n, p.value);
+ }
+ }
+ }
+ }
+ table = newTable;
+ }
+
+ /**
+ * This is a lockless remove. Make sure you acquire locks using segment.lock() first.
+ */
+ final LinkedEntry locklessRemove(Object key, int hash) {
+ int c = count - 1;
+ HashEntry[] tab = table;
+ int index = hash & (tab.length - 1);
+ HashEntry first = tab[index];
+ HashEntry e = first;
+ while (e != null && (e.hash != hash || !key.equals(e.key)))
+ e = e.next;
+
+ LinkedEntry oldValue = null;
+ if (e != null) {
+ oldValue = e.value;
+ // All entries following removed node can stay
+ // in list, but all preceding ones need to be
+ // cloned.
+ HashEntry newFirst = e.next;
+ for (HashEntry p = first; p != e; p = p.next)
+ newFirst = new HashEntry(p.key, p.hash,
+ newFirst, p.value);
+ tab[index] = newFirst;
+ count = c; // write-volatile
+
+ }
+ return oldValue;
+ }
+
+ /**
+ * This is a lockless clear. Ensure you acquire locks on the segment first using segment.lock().
+ */
+ final void locklessClear() {
+ if (count != 0) {
+ HashEntry[] tab = table;
+ for (int i = 0; i < tab.length; i++)
+ tab[i] = null;
+ count = 0; // write-volatile
+ }
+ }
+ }
+
+
+ protected final class KeySet extends AbstractSet<Object> {
+ public Iterator<Object> iterator() {
+ return new KeyIterator();
+ }
+
+ public int size() {
+ return FIFOAMRDataContainer.this.size();
+ }
+ }
+
+ protected final class Values extends AbstractCollection<Object> {
+ public Iterator<Object> iterator() {
+ return new ValueIterator();
+ }
+
+ public int size() {
+ return FIFOAMRDataContainer.this.size();
+ }
+ }
+
+ protected final class EntrySet extends AbstractSet<InternalCacheEntry> {
+ public Iterator<InternalCacheEntry> iterator() {
+ return new ImmutableEntryIterator();
+ }
+
+ public int size() {
+ return FIFOAMRDataContainer.this.size();
+ }
+ }
+
+ protected abstract class LinkedIterator {
+ LinkedEntry current = head;
+
+ public boolean hasNext() {
+ if (current == tail) return false;
+ current = getNext(current);
+ return current != null;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ protected final class EntryIterator extends LinkedIterator implements Iterator<InternalCacheEntry> {
+ public InternalCacheEntry next() {
+ return current.e;
+ }
+ }
+
+ protected final class ImmutableEntryIterator extends LinkedIterator implements Iterator<InternalCacheEntry> {
+ public InternalCacheEntry next() {
+ return Immutables.immutableInternalCacheEntry(current.e);
+ }
+ }
+
+ protected final class KeyIterator extends LinkedIterator implements Iterator<Object> {
+ public Object next() {
+ return current.e.getKey();
+ }
+ }
+
+ protected final class ValueIterator extends LinkedIterator implements Iterator<Object> {
+ public Object next() {
+ return current.e.getValue();
+ }
+ }
+
+
+ // ----------- PUBLIC API ---------------
+
+ public InternalCacheEntry get(Object k) {
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ LinkedEntry le = s.get(k, h);
+ InternalCacheEntry ice = null;
+ if (le != null) ice = le.e;
+ if (ice != null) {
+ if (ice.isExpired()) {
+ remove(k);
+ ice = null;
+ } else {
+ ice.touch();
+ }
+ }
+ return ice;
+ }
+
+ public InternalCacheEntry peek(Object k) {
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ LinkedEntry le = s.get(k, h);
+ InternalCacheEntry ice = null;
+ if (le != null) ice = le.e;
+ return ice;
+ }
+
+ public void put(Object k, Object v, long lifespan, long maxIdle) {
+ // do a normal put first.
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ s.lock();
+ LinkedEntry le;
+ boolean newEntry = false;
+ try {
+ le = s.get(k, h);
+ InternalCacheEntry ice = le == null ? null : le.e;
+ if (ice == null) {
+ newEntry = true;
+ ice = InternalEntryFactory.create(k, v, lifespan, maxIdle);
+ // only update linking if this is a new entry
+ le = new LinkedEntry(ice);
+ } else {
+ ice.setValue(v);
+ ice = ice.setLifespan(lifespan).setMaxIdle(maxIdle);
+ // need to do this anyway since the ICE impl may have changed
+ le.e = ice;
+ }
+
+ s.locklessPut(k, h, le);
+
+ if (newEntry) {
+ linkAtEnd(le);
+ }
+ } finally {
+ s.unlock();
+ }
+ }
+
+ public boolean containsKey(Object k) {
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ LinkedEntry le = s.get(k, h);
+ InternalCacheEntry ice = null;
+ if (le != null) ice = le.e;
+ if (ice != null) {
+ if (ice.isExpired()) {
+ remove(k);
+ ice = null;
+ }
+ }
+
+ return ice != null;
+ }
+
+ public InternalCacheEntry remove(Object k) {
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ s.lock();
+ InternalCacheEntry ice = null;
+ LinkedEntry le;
+ try {
+ le = s.locklessRemove(k, h);
+ if (le != null) {
+ ice = le.e;
+ unlink(le);
+ }
+ } finally {
+ s.unlock();
+ }
+
+ if (ice == null || ice.isExpired())
+ return null;
+ else
+ return ice;
+ }
+
+ public int size() {
+ // approximate sizing is good enough
+ int sz = 0;
+ final Segment[] segs = segments;
+ for (Segment s : segs) sz += s.count;
+ return sz;
+ }
+
+ public void clear() {
+ // This is expensive...
+ // lock all segments
+ for (Segment s : segments) s.lock();
+ try {
+ for (Segment s : segments) s.locklessClear();
+ initLinks();
+ } finally {
+ for (Segment s : segments) s.unlock();
+ }
+ }
+
+ public Set<Object> keySet() {
+ if (keySet == null) keySet = new KeySet();
+ return keySet;
+ }
+
+ public Collection<Object> values() {
+ return new Values();
+ }
+
+ public Set<InternalCacheEntry> entrySet() {
+ return new EntrySet();
+ }
+
+ public void purgeExpired() {
+ for (InternalCacheEntry ice : this) {
+ if (ice.isExpired()) remove(ice.getKey());
+ }
+ }
+
+ public Iterator<InternalCacheEntry> iterator() {
+ return new EntryIterator();
+ }
+}
\ No newline at end of file
Property changes on: trunk/core/src/main/java/org/infinispan/container/FIFOAMRDataContainer.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-11-27 11:03:30 UTC (rev 1224)
+++ trunk/core/src/main/java/org/infinispan/container/FIFODataContainer.java 2009-11-27 12:56:45 UTC (rev 1225)
@@ -1,5 +1,6 @@
package org.infinispan.container;
+import net.jcip.annotations.ThreadSafe;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalEntryFactory;
import org.infinispan.util.Immutables;
@@ -25,13 +26,18 @@
* Links are maintained using techniques inspired by H. Sundell and P. Tsigas' 2008 paper, <a
* href="http://www.md.chalmers.se/~tsigas/papers/Lock-Free-Deques-Doubly-Lists-JPDC.pdf"><i>Lock Free Deques and Doubly
* Linked Lists</i></a>, M. Michael's 2002 paper, <a href="http://www.research.ibm.com/people/m/michael/spaa-2002.pdf"><i>High
- * Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>, and Java6's ConcurrentSkipListMap.
+ * Performance Dynamic Lock-Free Hash Tables and List-Based Sets</i></a>
+ * <p />
+ * This implementation uses a technique of delegating marker nodes similar to the technique used in Sun's
+ * {@link java.util.concurrent.ConcurrentSkipListMap}, which is deemed more memory efficient and better performing than
+ * {@link java.util.concurrent.atomic.AtomicMarkableReference}s.
* <p/>
*
* @author Manik Surtani
* @author Galder Zamarreño
* @since 4.0
*/
+ at ThreadSafe
public class FIFODataContainer implements DataContainer {
/**
@@ -104,6 +110,9 @@
LockSupport.parkNanos(actualNanos);
long newNanos = actualNanos << 1;
return newNanos > 10000000 ? backoffStart : newNanos;
+// int millis = (1+ r.nextInt(9)) * 10;
+// LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(millis));
+// return -1;
}
/**
@@ -183,6 +192,11 @@
final boolean casPrev(LinkedEntry expected, LinkedEntry newValue) {
return P_UPDATER.compareAndSet(this, expected, newValue);
}
+
+ @Override
+ public String toString() {
+ return "E" + Integer.toHexString(System.identityHashCode(this));
+ }
}
/**
@@ -197,6 +211,11 @@
n = actual;
p = actual;
}
+
+ @Override
+ public String toString() {
+ return "M" + Integer.toHexString(System.identityHashCode(this));
+ }
}
/**
@@ -242,11 +261,12 @@
*
* @param entry entry to link at end
*/
+ // Corresponds to PushLeft() in the Sundell/Tsigas paper
protected final void linkAtEnd(LinkedEntry entry) {
LinkedEntry prev = tail.p;
long backoffTime = -1;
for (; ;) {
- entry.p = prev;
+ entry.p = unmarkPrevIfNeeded(prev);
entry.n = tail;
if (prev.casNext(tail, entry)) break;
prev = correctPrev(prev, tail);
@@ -288,27 +308,6 @@
}
/**
- * Retrieves the previous entry befora a given entry, skipping marked entries accordingly.
- *
- * @param current current entry to inspect
- * @return the previous valid entry, or null if we have reached the start of the list.
- */
- protected final LinkedEntry getPrev(LinkedEntry current) {
- for (; ;) {
- if (current == head) return null;
- LinkedEntry prev = current.p;
- if (prev.n == current && !isMarkedForRemoval(current) && !isMarkedForRemoval(current.n)) {
- current = prev;
- if (current != head) return current;
- } else if (isMarkedForRemoval(current.n)) {
- current = getNext(current);
- } else {
- prev = correctPrev(prev, current);
- }
- }
- }
-
- /**
* Correct 'previous' links. This 'helper' function is used if unable to properly set previous pointers (due to a
* concurrent update) and is used when traversing the list in reverse.
*
@@ -317,41 +316,52 @@
* @return the actual valid, previous entry. Links are also corrected in the process.
*/
protected final LinkedEntry correctPrev(LinkedEntry suggestedPreviousEntry, LinkedEntry currentEntry) {
+// verifyLL();
LinkedEntry lastLink = null, link1, prev2;
+ LinkedEntry prev = suggestedPreviousEntry, node = currentEntry;
long backoffTime = -1;
while (true) {
- link1 = currentEntry.p;
+ link1 = node.p;
if (isMarkedForRemoval(link1)) break;
- prev2 = suggestedPreviousEntry.n;
+ prev2 = prev.n;
if (isMarkedForRemoval(prev2)) {
if (lastLink != null) {
- markPrevReference(suggestedPreviousEntry);
- lastLink.casNext(suggestedPreviousEntry, prev2.p);
- suggestedPreviousEntry = lastLink;
+ markPrevReference(prev);
+ LinkedEntry unmarkedPrev2P = unmarkPrevIfNeeded(prev2.p);
+ lastLink.casNext(prev, unmarkedPrev2P);
+ prev = lastLink;
lastLink = null;
continue;
}
- prev2 = suggestedPreviousEntry.p;
- suggestedPreviousEntry = prev2;
+ prev2 = prev.p;
+ prev = prev2;
continue;
}
- if (prev2 != currentEntry) {
- lastLink = suggestedPreviousEntry;
- suggestedPreviousEntry = prev2;
+ if (prev2 != node) {
+ lastLink = prev;
+ prev = prev2;
continue;
}
- if (currentEntry.casPrev(link1, suggestedPreviousEntry)) {
- if (isMarkedForRemoval(suggestedPreviousEntry.p)) continue;
+ LinkedEntry unmarked = unmarkPrevIfNeeded(prev);
+ if (node.casPrev(link1, unmarked)) {
+ if (isMarkedForRemoval(prev.p)) {
+ continue;
+ }
break;
}
backoffTime = backoff(backoffTime);
}
- return suggestedPreviousEntry;
+ return prev;
}
+ private LinkedEntry unmarkPrevIfNeeded(LinkedEntry e) {
+ if (isMarkedForRemoval(e)) return e.p;
+ else return e;
+ }
+
/**
* Similar to ConcurrentHashMap's hash() function: applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length
@@ -712,10 +722,7 @@
Segment s = segmentFor(h);
LinkedEntry le = s.get(k, h);
InternalCacheEntry ice = null;
- if (le != null) {
- ice = le.e;
- if (isMarkedForRemoval(le)) unlink(le);
- }
+ if (le != null) ice = le.e;
if (ice != null) {
if (ice.isExpired()) {
remove(k);
@@ -732,10 +739,7 @@
Segment s = segmentFor(h);
LinkedEntry le = s.get(k, h);
InternalCacheEntry ice = null;
- if (le != null) {
- ice = le.e;
- if (isMarkedForRemoval(le)) unlink(le);
- }
+ if (le != null) ice = le.e;
return ice;
}
@@ -776,10 +780,7 @@
Segment s = segmentFor(h);
LinkedEntry le = s.get(k, h);
InternalCacheEntry ice = null;
- if (le != null) {
- ice = le.e;
- if (isMarkedForRemoval(le)) unlink(le);
- }
+ if (le != null) ice = le.e;
if (ice != null) {
if (ice.isExpired()) {
remove(k);
Added: trunk/core/src/main/java/org/infinispan/container/LRUAMRDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/LRUAMRDataContainer.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/container/LRUAMRDataContainer.java 2009-11-27 12:56:45 UTC (rev 1225)
@@ -0,0 +1,94 @@
+package org.infinispan.container;
+
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalEntryFactory;
+
+/**
+ * Based on the same techniques outlined in the {@link FIFODataContainer}, this implementation
+ * additionally unlinks and re-links entries at the tail whenever entries are visited (using a get()) or are updated (a
+ * put() on an existing key).
+ * <p/>
+ * Again, these are constant-time operations.
+ * <p/>
+ * Note though that this implementation does have a far lesser degree of concurrency when compared with its FIFO variant
+ * due to the segment locking necessary even when doing a get() (since gets reorder links). This has a knock-on effect
+ * not just on get() but even on other write() operations since they all compete for the same segment lock (when working
+ * on keys mapped to the same segment, of course).
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at ThreadSafe
+public class LRUAMRDataContainer extends FIFOAMRDataContainer {
+
+ public LRUAMRDataContainer(int concurrencyLevel) {
+ super(concurrencyLevel);
+ }
+
+ @Override
+ public InternalCacheEntry get(Object k) {
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ LinkedEntry le = s.get(k, h);
+ InternalCacheEntry ice = null;
+ if (le != null) ice = le.e;
+ if (ice != null) {
+ if (ice.isExpired()) {
+ remove(k);
+ ice = null;
+ } else {
+ ice.touch();
+ boolean needToUnlock = false;
+ try {
+ s.lock(); // no not update links unless segment is locked!
+ needToUnlock = true;
+ updateLinks(le);
+ } finally {
+ if (needToUnlock) s.unlock();
+ }
+ }
+ }
+ return ice;
+ }
+
+ @Override
+ public void put(Object k, Object v, long lifespan, long maxIdle) {
+ // do a normal put first.
+ int h = hash(k.hashCode());
+ Segment s = segmentFor(h);
+ s.lock();
+ LinkedEntry le;
+ boolean newEntry = false;
+ try {
+ le = s.get(k, h);
+ InternalCacheEntry ice = le == null ? null : le.e;
+ if (ice == null) {
+ newEntry = true;
+ ice = InternalEntryFactory.create(k, v, lifespan, maxIdle);
+ le = new LinkedEntry(ice);
+ } else {
+ ice.setValue(v);
+ ice = ice.setLifespan(lifespan).setMaxIdle(maxIdle);
+ // need to do this anyway since the ICE impl may have changed
+ le.e = ice;
+ }
+
+ s.locklessPut(k, h, le);
+
+ if (newEntry) {
+ linkAtEnd(le);
+ } else {
+ updateLinks(le);
+ }
+
+ } finally {
+ s.unlock();
+ }
+ }
+
+ protected final void updateLinks(LinkedEntry le) {
+ unlink(le);
+ linkAtEnd(le);
+ }
+}
\ No newline at end of file
Property changes on: trunk/core/src/main/java/org/infinispan/container/LRUAMRDataContainer.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java 2009-11-27 11:03:30 UTC (rev 1224)
+++ trunk/core/src/main/java/org/infinispan/container/LRUDataContainer.java 2009-11-27 12:56:45 UTC (rev 1225)
@@ -1,5 +1,6 @@
package org.infinispan.container;
+import net.jcip.annotations.ThreadSafe;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalEntryFactory;
@@ -10,10 +11,15 @@
* <p/>
* Again, these are constant-time operations.
* <p/>
+ * Note though that this implementation does have a far lesser degree of concurrency when compared with its FIFO variant
+ * due to the segment locking necessary even when doing a get() (since gets reorder links). This has a knock-on effect
+ * not just on get() but even on other write() operations since they all compete for the same segment lock (when working
+ * on keys mapped to the same segment, of course).
*
* @author Manik Surtani
* @since 4.0
*/
+ at ThreadSafe
public class LRUDataContainer extends FIFODataContainer {
public LRUDataContainer(int concurrencyLevel) {
@@ -24,19 +30,24 @@
public InternalCacheEntry get(Object k) {
int h = hash(k.hashCode());
Segment s = segmentFor(h);
+
LinkedEntry le = s.get(k, h);
InternalCacheEntry ice = null;
- if (le != null) {
- ice = le.e;
- if (isMarkedForRemoval(le)) unlink(le);
- }
+ if (le != null) ice = le.e;
if (ice != null) {
if (ice.isExpired()) {
remove(k);
ice = null;
} else {
ice.touch();
- updateLinks(le);
+ boolean needToUnlockSegment = false;
+ try {
+ s.lock(); // we need to lock this segment to safely update links
+ needToUnlockSegment = true;
+ updateLinks(le);
+ } finally {
+ if (needToUnlockSegment) s.unlock();
+ }
}
}
return ice;
Added: trunk/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java 2009-11-27 12:56:45 UTC (rev 1225)
@@ -0,0 +1,129 @@
+package org.infinispan.stress;
+
+import org.infinispan.container.DataContainer;
+import org.infinispan.container.FIFOAMRDataContainer;
+import org.infinispan.container.FIFODataContainer;
+import org.infinispan.container.LRUAMRDataContainer;
+import org.infinispan.container.LRUDataContainer;
+import org.infinispan.container.SimpleDataContainer;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Stress test different data containers
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at Test(testName = "stress.DataContainerStressTest", groups = "stress")
+public class DataContainerStressTest {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final int RUN_TIME_MILLIS = 60 * 1000; // 1 min
+ final int num_loops = 10000;
+ boolean use_time = true;
+ private static final Log log = LogFactory.getLog(DataContainerStressTest.class);
+
+ public void testSimpleDataContainer() throws InterruptedException {
+ doTest(new SimpleDataContainer(5000));
+ }
+
+ public void testFIFODataContainer() throws InterruptedException {
+ doTest(new FIFODataContainer(5000));
+ }
+
+ public void testFIFOAMRDataContainer() throws InterruptedException {
+ doTest(new FIFOAMRDataContainer(5000));
+ }
+
+ public void testLRUAMRDataContainer() throws InterruptedException {
+ doTest(new LRUAMRDataContainer(5000));
+ }
+
+ public void testLRUDataContainer() throws InterruptedException {
+ doTest(new LRUDataContainer(5000));
+ }
+
+ private void doTest(final DataContainer dc) throws InterruptedException {
+ final String key = "key";
+ final Map<String, String> perf = new ConcurrentSkipListMap<String, String>();
+ final AtomicBoolean run = new AtomicBoolean(true);
+
+ Thread getter = new Thread() {
+ public void run() {
+ waitForStart();
+ long start = System.nanoTime();
+ int runs = 0;
+ while (use_time && run.get() || runs < num_loops) {
+ if (runs % 100000 == 0) log.info("GET run # " + runs);
+// TestingUtil.sleepThread(10);
+ dc.get(key);
+ runs++;
+ }
+ perf.put("GET", opsPerMS(System.nanoTime() - start, runs));
+ }
+ };
+
+ Thread putter = new Thread() {
+ public void run() {
+ waitForStart();
+ long start = System.nanoTime();
+ int runs = 0;
+ while (use_time && run.get() || runs < num_loops) {
+ if (runs % 100000 == 0) log.info("PUT run # " + runs);
+// TestingUtil.sleepThread(10);
+ dc.put(key, "value", -1, -1);
+ runs++;
+ }
+ perf.put("PUT", opsPerMS(System.nanoTime() - start, runs));
+ }
+ };
+
+ Thread remover = new Thread() {
+ public void run() {
+ waitForStart();
+ long start = System.nanoTime();
+ int runs = 0;
+ while (use_time && run.get() || runs < num_loops) {
+ if (runs % 100000 == 0) log.info("REM run # " + runs);
+// TestingUtil.sleepThread(10);
+ dc.remove(key);
+ runs++;
+ }
+ perf.put("REM", opsPerMS(System.nanoTime() - start, runs));
+ }
+ };
+
+ Thread[] threads = {getter, putter, remover};
+ for (Thread t : threads) t.start();
+ latch.countDown();
+
+ // wait some time
+ Thread.sleep(RUN_TIME_MILLIS);
+ run.set(false);
+ for (Thread t : threads) t.join();
+ log.warn("{0}: Performance: {1}", dc.getClass().getSimpleName(), perf);
+ }
+
+ private void waitForStart() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String opsPerMS(long nanos, int ops) {
+ long totalMillis = TimeUnit.NANOSECONDS.toMillis(nanos);
+ if (totalMillis > 0)
+ return ops / totalMillis + " ops/ms";
+ else
+ return "NAN ops/ms";
+ }
+}
Property changes on: trunk/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list