[infinispan-commits] Infinispan SVN: r1448 - in trunk/core/src: test/java/org/infinispan/stress and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Feb 3 13:52:52 EST 2010
Author: vblagojevic at jboss.com
Date: 2010-02-03 13:52:52 -0500 (Wed, 03 Feb 2010)
New Revision: 1448
Modified:
trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java
trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java
Log:
initial LIRS implementation
Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java 2010-02-03 17:38:53 UTC (rev 1447)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java 2010-02-03 18:52:52 UTC (rev 1448)
@@ -25,17 +25,16 @@
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap; //import java.util.concurrent.ConcurrentHashMap.HashEntry;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import org.infinispan.eviction.EvictionStrategy;
-
/**
* A hash table supporting full concurrency of retrievals and adjustable expected concurrency for
* updates. This class obeys the same functional specification as {@link java.util.Hashtable}, and
@@ -203,20 +202,52 @@
final int hash;
volatile V value;
final HashEntry<K, V> next;
+ volatile Recency state;
HashEntry(K key, int hash, HashEntry<K, V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
- }
+ this.state = Recency.HIR_RESIDENT;
+ }
+
+ public void transitionHIRResidentToLIRResident(){
+ assert state == Recency.HIR_RESIDENT;
+ state = Recency.LIR_RESIDENT;
+ }
+
+ public void transitionHIRResidentToHIRNonResident() {
+ assert state == Recency.HIR_RESIDENT;
+ state = Recency.HIR_NONRESIDENT;
+ }
+
+ public void transitionHIRNonResidentToLIRResident() {
+ assert state == Recency.HIR_NONRESIDENT;
+ state = Recency.LIR_RESIDENT;
+ }
+
+ public void transitionLIRResidentToHIRResident() {
+ assert state == Recency.LIR_RESIDENT;
+ state = Recency.HIR_RESIDENT;
+ }
+
+ public Recency recency() {
+ return state;
+ }
@SuppressWarnings("unchecked")
static final <K, V> HashEntry<K, V>[] newArray(int i) {
return new HashEntry[i];
}
}
+
+ private enum Recency{HIR_RESIDENT,LIR_RESIDENT,HIR_NONRESIDENT};
+ public enum Eviction {
+ NONE,LRU,LIRS
+ };
+
interface EvictionPolicy<K, V> {
/**
@@ -242,7 +273,7 @@
* Invoked to notify EvictionPolicy implementation that an entry in Segment has been
* accessed. Returns true if batching threshold has been reached, false otherwise.
* <p>
- * Note that this method is invoked without holding a lock on Segment.
+ * Note that this method is potentially invoked without holding a lock on Segment.
*
* @return true if batching threshold has been reached, false otherwise.
*
@@ -272,12 +303,12 @@
*
* @return type of eviction algorithm
*/
- EvictionStrategy strategy();
+ Eviction strategy();
/**
* Returns true if batching threshold has expired, false otherwise.
* <p>
- * Note that this method is invoked without holding a lock on Segment.
+ * Note that this method is potentially invoked without holding a lock on Segment.
*
* @return true if batching threshold has expired, false otherwise.
*/
@@ -314,8 +345,8 @@
}
@Override
- public EvictionStrategy strategy() {
- return EvictionStrategy.NONE;
+ public Eviction strategy() {
+ return Eviction.NONE;
}
}
@@ -392,11 +423,14 @@
*/
final float loadFactor;
- Segment(int cap, float lf, EvictionStrategy es, EvictionListener<K, V> listener) {
+ Segment(int cap, float lf, Eviction es, EvictionListener<K, V> listener) {
loadFactor = lf;
- if (es == EvictionStrategy.LRU) {
+ if (es == Eviction.LRU) {
ea = new LRU(cap, lf, cap * 10, lf);
- } else {
+ } else if (es == Eviction.LIRS) {
+ ea = new LIRS(cap, lf, cap * 10, lf);
+ }
+ else {
ea = new NullEvictionPolicy<K, V>();
}
evictionListener = listener;
@@ -458,7 +492,7 @@
}
// a hit
if (result != null) {
- if (ea.onEntryHit(e) && ea.strategy() != EvictionStrategy.NONE) {
+ if (ea.onEntryHit(e) && ea.strategy() != Eviction.NONE) {
Set<HashEntry<K, V>> evicted = attemptEviction(false);
//piggyback listener invocation on callers thread outside lock
if (evictionListener != null && evicted != null) {
@@ -581,7 +615,7 @@
Set<HashEntry<K, V>> evicted = null;
try {
int c = count;
- if (c++ > threshold && ea.strategy() == EvictionStrategy.NONE) // ensure capacity
+ if (c++ > threshold && ea.strategy() == Eviction.NONE) // ensure capacity
rehash();
HashEntry<K, V>[] tab = table;
int index = hash & (tab.length - 1);
@@ -601,11 +635,10 @@
oldValue = null;
++modCount;
count = c; // write-volatile
- if (ea.strategy() != EvictionStrategy.NONE) {
+ if (ea.strategy() != Eviction.NONE) {
if (c > tab.length) {
// remove entries;lower count
evicted = ea.execute();
- assert !evicted.isEmpty();
// re-read first
first = tab[index];
}
@@ -832,10 +865,197 @@
}
@Override
- public EvictionStrategy strategy() {
- return EvictionStrategy.LRU;
+ public Eviction strategy() {
+ return Eviction.LRU;
}
}
+
+ class LIRS implements EvictionPolicy<K, V> {
+ private final static int MAX_BATCH_SIZE = 64;
+ private final ConcurrentLinkedQueue<HashEntry<K, V>> accessQueue;
+ private final LinkedHashMap<Integer,HashEntry<K, V>> stack;
+ private final LinkedList<HashEntry<K, V>> queue;
+
+ private final int maxBatchQueueSize;
+ private final int lirSizeLimit;
+ private final int hirSizeLimit;
+ private int currentLIRSize = 0;
+ private final float batchThresholdFactor;
+ private final AtomicInteger accessQueueSize = new AtomicInteger(0);
+
+ public LIRS(int capacity, float lf, int maxBatchSize, float batchThresholdFactor) {
+ this.lirSizeLimit = (int) (capacity*0.9);
+ int tmp = (int) (capacity *0.1);
+ this.hirSizeLimit = tmp<1?1:tmp;
+ this.maxBatchQueueSize = maxBatchSize > MAX_BATCH_SIZE ? MAX_BATCH_SIZE: maxBatchSize;
+ this.batchThresholdFactor = batchThresholdFactor;
+ this.accessQueue = new ConcurrentLinkedQueue<HashEntry<K, V>>();
+ this.stack = new LinkedHashMap<Integer,HashEntry<K, V>>();
+ this.queue = new LinkedList<HashEntry<K, V>>();
+ }
+
+ @Override
+ public Set<HashEntry<K, V>> execute() {
+ Set<HashEntry<K, V>> evicted = new HashSet<HashEntry<K, V>>();
+ try {
+ for (HashEntry<K, V> e : accessQueue) {
+ HashEntry<K, V> hit = find(e);
+ if (hit != null) {
+ if (hit.recency() == Recency.LIR_RESIDENT) {
+ handleLIRHit(hit, evicted);
+ } else if (hit.recency() == Recency.HIR_RESIDENT) {
+ handleHIRHit(hit, evicted);
+ }
+ }
+ }
+ removeFromSegment(evicted);
+ } finally {
+ accessQueue.clear();
+ accessQueueSize.set(0);
+ }
+ return evicted;
+ }
+
+ private void handleHIRHit(HashEntry<K, V> e, Set<HashEntry<K,V>> evicted) {
+ boolean inStack = stack.containsKey(e.hashCode());
+ if(inStack)
+ stack.remove(e.hashCode());
+
+ //first put on top of the stack
+ stack.put(e.hashCode(), e);
+
+ if(inStack) {
+ assert queue.contains(e);
+ queue.remove(e);
+ e.transitionHIRResidentToLIRResident();
+ switchBottomostLIRtoHIRAndPrune(evicted);
+ } else {
+ assert queue.contains(e);
+ queue.remove(e);
+ queue.addLast(e);
+ }
+ }
+
+ private void handleLIRHit(HashEntry<K, V> e, Set<HashEntry<K,V>> evicted) {
+ stack.remove(e.hashCode());
+ stack.put(e.hashCode(), e);
+ for(Iterator<HashEntry<K, V>> i = stack.values().iterator();i.hasNext();) {
+ HashEntry<K,V> next = i.next();
+ if(next.recency() != Recency.LIR_RESIDENT) {
+ i.remove();
+ evicted.add(next);
+ } else {
+ break;
+ }
+ }
+ }
+
+ private HashEntry<K, V> find(HashEntry<K, V> e) {
+ HashEntry<K, V> hit = stack.get(e.hashCode());
+ if(hit == null && queue.contains(e)) {
+ hit = e;
+ }
+ return hit;
+ }
+
+ @Override
+ public void onEntryMiss(HashEntry<K, V> e) {
+ // initialization
+ if (currentLIRSize+1< lirSizeLimit) {
+ currentLIRSize++;
+ e.transitionHIRResidentToLIRResident();
+ stack.put(e.hashCode(), e);
+ } else {
+ if (queue.size()< hirSizeLimit) {
+ queue.addLast(e);
+ } else {
+ Set<HashEntry<K, V>> evicted = new HashSet<HashEntry<K, V>>();
+ boolean inStack = stack.containsKey(e.hashCode());
+
+ HashEntry<K, V> first = queue.removeFirst();
+ assert first.recency() == Recency.HIR_RESIDENT;
+ first.transitionHIRResidentToHIRNonResident();
+ evicted.add(first);
+ stack.put(e.hashCode(), e);
+
+ if (inStack) {
+ e.transitionHIRResidentToLIRResident();
+ switchBottomostLIRtoHIRAndPrune(evicted);
+ } else {
+ queue.addLast(e);
+ }
+ removeFromSegment(evicted);
+ }
+ }
+ }
+
+ private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
+ for (HashEntry<K, V> e : evicted) {
+ remove(e.key,e.hash,null);
+ }
+ }
+
+ private void switchBottomostLIRtoHIRAndPrune(Set<HashEntry<K,V>> evicted) {
+ boolean seenFirstLIR = false;
+ for(Iterator<HashEntry<K, V>> i = stack.values().iterator();i.hasNext();) {
+ HashEntry<K,V> next = i.next();
+ if(next.recency() == Recency.LIR_RESIDENT) {
+ if(!seenFirstLIR) {
+ seenFirstLIR = true;
+ i.remove();
+ next.transitionLIRResidentToHIRResident();
+ queue.addLast(next);
+ } else {
+ break;
+ }
+ } else {
+ i.remove();
+ evicted.add(next);
+ }
+ }
+ }
+
+ /*
+ * Invoked without holding a lock on Segment
+ */
+ @Override
+ public boolean onEntryHit(HashEntry<K, V> e) {
+ accessQueue.add(e);
+ return accessQueueSize.incrementAndGet() >= maxBatchQueueSize * batchThresholdFactor;
+ }
+
+ /*
+ * Invoked without holding a lock on Segment
+ */
+ @Override
+ public boolean thresholdExpired() {
+ return accessQueueSize.get() >= maxBatchQueueSize;
+ }
+
+ @Override
+ public void onEntryRemove(HashEntry<K, V> e) {
+ HashEntry<K, V> removed = stack.remove(e.hashCode());
+ if(removed != null && removed.recency()==Recency.LIR_RESIDENT) {
+ currentLIRSize--;
+ }
+ queue.remove(e);
+ if (accessQueue.remove(e)) {
+ accessQueueSize.decrementAndGet();
+ }
+ }
+
+ @Override
+ public void clear() {
+ stack.clear();
+ accessQueue.clear();
+ accessQueueSize.set(0);
+ }
+
+ @Override
+ public Eviction strategy() {
+ return Eviction.LIRS;
+ }
+ }
}
/* ---------------- Public operations -------------- */
@@ -853,12 +1073,19 @@
* @param concurrencyLevel
* the estimated number of concurrently updating threads. The implementation performs
* internal sizing to try to accommodate this many threads.
+ *
+ * @param evictionStrategy
+ * the algorithm used to evict elements from this map
+ *
+ * @param evictionListener
+ * the evicton listener callback to be notified about evicted elements
+ *
* @throws IllegalArgumentException
* if the initial capacity is negative or the load factor or concurrencyLevel are
* nonpositive.
*/
public BufferedConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel,
- EvictionStrategy evictionStrategy, EvictionListener<K, V> evictionListener) {
+ Eviction evictionStrategy, EvictionListener<K, V> evictionListener) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
@@ -890,7 +1117,7 @@
}
public BufferedConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- this(initialCapacity, loadFactor, concurrencyLevel, EvictionStrategy.LRU, null);
+ this(initialCapacity, loadFactor, concurrencyLevel, Eviction.LRU, null);
}
/**
Modified: trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java 2010-02-03 17:38:53 UTC (rev 1447)
+++ trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java 2010-02-03 18:52:52 UTC (rev 1448)
@@ -12,10 +12,8 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.util.concurrent.BufferedConcurrentHashMap;
+import org.infinispan.util.concurrent.BufferedConcurrentHashMap.Eviction;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -30,9 +28,9 @@
volatile CountDownLatch latch;
final int MAP_CAPACITY = 512;
final float MAP_LOAD_FACTOR = 0.75f;
- final int CONCURRENCY = 16;
+ final int CONCURRENCY = 32;
- final int RUN_TIME_MILLIS = 10 * 1000; // 10 sec
+ final int RUN_TIME_MILLIS = 5 * 1000; // 10 sec
final int NUM_KEYS = 50000;
final int LOOP_FACTOR=5;
@@ -57,9 +55,13 @@
doTest(new ConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY));
}
- public void testBufferedConcurrentHashMap() throws Exception {
- doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, EvictionStrategy.LRU,null));
+ public void testBufferedConcurrentHashMapLRU() throws Exception {
+ doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, Eviction.LRU,null));
}
+
+ public void testBufferedConcurrentHashMapLIRS() throws Exception {
+ doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, Eviction.LIRS,null));
+ }
public void testHashMap() throws Exception {
doTest(Collections.synchronizedMap(new HashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR)));
@@ -74,7 +76,6 @@
latch = new CountDownLatch(1);
final Map<String, String> perf = new ConcurrentSkipListMap<String, String>();
- final AtomicBoolean run = new AtomicBoolean(true);
List<Thread> threads = new LinkedList<Thread>();
for (int i = 0; i < numReaders; i++) {
@@ -83,7 +84,7 @@
waitForStart();
long start = System.nanoTime();
int runs = 0;
- while (run.get() && runs < readOps.size()) {
+ while (runs < readOps.size()) {
map.get(readOps.get(runs));
runs++;
}
@@ -100,7 +101,7 @@
waitForStart();
long start = System.nanoTime();
int runs = 0;
- while (run.get() && runs < writeOps.size()) {
+ while (runs < writeOps.size()) {
map.put(writeOps.get(runs),runs);
runs++;
}
@@ -117,7 +118,7 @@
waitForStart();
long start = System.nanoTime();
int runs = 0;
- while (run.get() && runs < removeOps.size()) {
+ while (runs < removeOps.size()) {
map.remove(removeOps.get(runs));
runs++;
}
@@ -134,7 +135,6 @@
// wait some time
Thread.sleep(RUN_TIME_MILLIS);
- run.set(false);
for (Thread t : threads)
t.join();
More information about the infinispan-commits
mailing list