[infinispan-commits] Infinispan SVN: r1448 - in trunk/core/src: test/java/org/infinispan/stress and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Feb 3 13:52:52 EST 2010


Author: vblagojevic at jboss.com
Date: 2010-02-03 13:52:52 -0500 (Wed, 03 Feb 2010)
New Revision: 1448

Modified:
   trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java
   trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java
Log:
initial LIRS implementation

Modified: trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java	2010-02-03 17:38:53 UTC (rev 1447)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/BufferedConcurrentHashMap.java	2010-02-03 18:52:52 UTC (rev 1448)
@@ -25,17 +25,16 @@
 import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap; //import java.util.concurrent.ConcurrentHashMap.HashEntry;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.infinispan.eviction.EvictionStrategy;
-
 /**
  * A hash table supporting full concurrency of retrievals and adjustable expected concurrency for
  * updates. This class obeys the same functional specification as {@link java.util.Hashtable}, and
@@ -203,20 +202,52 @@
         final int hash;
         volatile V value;
         final HashEntry<K, V> next;
+        volatile Recency state;
 
         HashEntry(K key, int hash, HashEntry<K, V> next, V value) {
             this.key = key;
             this.hash = hash;
             this.next = next;
             this.value = value;
-        }        
+            this.state = Recency.HIR_RESIDENT;
+        }                        
+        
+        public void transitionHIRResidentToLIRResident(){
+            assert state == Recency.HIR_RESIDENT;
+            state = Recency.LIR_RESIDENT;
+        }
+        
+        public void transitionHIRResidentToHIRNonResident() {
+            assert state == Recency.HIR_RESIDENT;
+            state = Recency.HIR_NONRESIDENT;
+        }
+        
+        public void transitionHIRNonResidentToLIRResident() {
+            assert state == Recency.HIR_NONRESIDENT;
+            state = Recency.LIR_RESIDENT;
+        }
+        
+        public void transitionLIRResidentToHIRResident() {
+            assert state == Recency.LIR_RESIDENT;
+            state = Recency.HIR_RESIDENT;
+        }
+        
+        public Recency recency() {
+            return state;
+        }
 
         @SuppressWarnings("unchecked")
         static final <K, V> HashEntry<K, V>[] newArray(int i) {
             return new HashEntry[i];
         }
     }
+    
+    private enum Recency{HIR_RESIDENT,LIR_RESIDENT,HIR_NONRESIDENT};
 
+    public enum Eviction {
+       NONE,LRU,LIRS
+    };
+
     interface EvictionPolicy<K, V> {
 
         /**
@@ -242,7 +273,7 @@
          * Invoked to notify EvictionPolicy implementation that an entry in Segment has been
          * accessed. Returns true if batching threshold has been reached, false otherwise.
          * <p>
-         * Note that this method is invoked without holding a lock on Segment.
+         * Note that this method is potentially invoked without holding a lock on Segment.
          * 
          * @return true if batching threshold has been reached, false otherwise.
          * 
@@ -272,12 +303,12 @@
          * 
          * @return type of eviction algorithm
          */
-        EvictionStrategy strategy();
+        Eviction strategy();
 
         /**
          * Returns true if batching threshold has expired, false otherwise.
          * <p>
-         * Note that this method is invoked without holding a lock on Segment.
+         * Note that this method is potentially invoked without holding a lock on Segment.
          * 
          * @return true if batching threshold has expired, false otherwise.
          */
@@ -314,8 +345,8 @@
         }
 
         @Override
-        public EvictionStrategy strategy() {
-            return EvictionStrategy.NONE;
+        public Eviction strategy() {
+            return Eviction.NONE;
         }
     }
 
@@ -392,11 +423,14 @@
          */
         final float loadFactor;
 
-        Segment(int cap, float lf, EvictionStrategy es, EvictionListener<K, V> listener) {
+        Segment(int cap, float lf, Eviction es, EvictionListener<K, V> listener) {
             loadFactor = lf;
-            if (es == EvictionStrategy.LRU) {
+            if (es == Eviction.LRU) {
                 ea = new LRU(cap, lf, cap * 10, lf);
-            } else {
+            } else if (es == Eviction.LIRS) { 
+                ea = new LIRS(cap, lf, cap * 10, lf);
+            }
+            else {
                 ea = new NullEvictionPolicy<K, V>();
             }
             evictionListener = listener;
@@ -458,7 +492,7 @@
                 }
                 // a hit
                 if (result != null) {
-                    if (ea.onEntryHit(e) && ea.strategy() != EvictionStrategy.NONE) {
+                    if (ea.onEntryHit(e) && ea.strategy() != Eviction.NONE) {
                         Set<HashEntry<K, V>> evicted = attemptEviction(false);
                         //piggyback listener invocation on callers thread outside lock
                         if (evictionListener != null && evicted != null) {
@@ -581,7 +615,7 @@
             Set<HashEntry<K, V>> evicted = null;
             try {
                 int c = count;
-                if (c++ > threshold && ea.strategy() == EvictionStrategy.NONE) // ensure capacity
+                if (c++ > threshold && ea.strategy() == Eviction.NONE) // ensure capacity
                     rehash();
                 HashEntry<K, V>[] tab = table;
                 int index = hash & (tab.length - 1);
@@ -601,11 +635,10 @@
                     oldValue = null;
                     ++modCount;                    
                     count = c; // write-volatile
-                    if (ea.strategy() != EvictionStrategy.NONE) {
+                    if (ea.strategy() != Eviction.NONE) {
                         if (c > tab.length) {
                             // remove entries;lower count
                             evicted = ea.execute();
-                            assert !evicted.isEmpty();
                             // re-read first
                             first = tab[index];
                         }
@@ -832,10 +865,197 @@
             }
 
             @Override
-            public EvictionStrategy strategy() {
-                return EvictionStrategy.LRU;
+            public Eviction strategy() {
+                return Eviction.LRU;
             }
         }
+        
+        class LIRS implements EvictionPolicy<K, V> {
+            private final static int MAX_BATCH_SIZE = 64;
+            private final ConcurrentLinkedQueue<HashEntry<K, V>> accessQueue;
+            private final LinkedHashMap<Integer,HashEntry<K, V>> stack;
+            private final LinkedList<HashEntry<K, V>> queue;
+
+            private final int maxBatchQueueSize;
+            private final int lirSizeLimit;
+            private final int hirSizeLimit;
+            private int currentLIRSize = 0;
+            private final float batchThresholdFactor;
+            private final AtomicInteger accessQueueSize = new AtomicInteger(0);
+
+            public LIRS(int capacity, float lf, int maxBatchSize, float batchThresholdFactor) {
+                this.lirSizeLimit = (int) (capacity*0.9);
+                int tmp = (int) (capacity *0.1);
+                this.hirSizeLimit = tmp<1?1:tmp;                
+                this.maxBatchQueueSize = maxBatchSize > MAX_BATCH_SIZE ? MAX_BATCH_SIZE: maxBatchSize;
+                this.batchThresholdFactor = batchThresholdFactor;
+                this.accessQueue = new ConcurrentLinkedQueue<HashEntry<K, V>>();
+                this.stack = new LinkedHashMap<Integer,HashEntry<K, V>>();
+                this.queue = new LinkedList<HashEntry<K, V>>();
+            }
+
+            @Override
+            public Set<HashEntry<K, V>> execute() {
+                Set<HashEntry<K, V>> evicted = new HashSet<HashEntry<K, V>>();
+                try {
+                    for (HashEntry<K, V> e : accessQueue) {
+                        HashEntry<K, V> hit = find(e);
+                        if (hit != null) {
+                            if (hit.recency() == Recency.LIR_RESIDENT) {
+                                handleLIRHit(hit, evicted);
+                            } else if (hit.recency() == Recency.HIR_RESIDENT) {
+                                handleHIRHit(hit, evicted);
+                            }
+                        } 
+                    }                            
+                    removeFromSegment(evicted);
+                } finally {
+                    accessQueue.clear();
+                    accessQueueSize.set(0);
+                }
+                return evicted;
+            }
+            
+            private void handleHIRHit(HashEntry<K, V> e,  Set<HashEntry<K,V>> evicted) {
+                boolean inStack = stack.containsKey(e.hashCode());
+                if(inStack)
+                    stack.remove(e.hashCode());
+                
+                //first put on top of the stack
+                stack.put(e.hashCode(), e);
+                
+                if(inStack) {
+                    assert queue.contains(e);
+                    queue.remove(e);
+                    e.transitionHIRResidentToLIRResident();                    
+                    switchBottomostLIRtoHIRAndPrune(evicted);                   
+                } else {
+                    assert queue.contains(e);
+                    queue.remove(e);
+                    queue.addLast(e);
+                }
+            }
+
+            private void handleLIRHit(HashEntry<K, V> e, Set<HashEntry<K,V>> evicted) {
+                stack.remove(e.hashCode());
+                stack.put(e.hashCode(), e);
+                for(Iterator<HashEntry<K, V>> i = stack.values().iterator();i.hasNext();) {
+                    HashEntry<K,V> next = i.next();
+                    if(next.recency() != Recency.LIR_RESIDENT) {
+                        i.remove();
+                        evicted.add(next);
+                    } else {
+                        break;
+                    }
+                }
+            }                       
+            
+            private HashEntry<K, V> find(HashEntry<K, V> e) {
+                HashEntry<K, V> hit = stack.get(e.hashCode());
+                if(hit == null && queue.contains(e)) {
+                    hit = e;
+                }
+                return hit;
+            }
+
+            @Override
+            public void onEntryMiss(HashEntry<K, V> e) {
+                // initialization
+                if (currentLIRSize+1< lirSizeLimit) {
+                    currentLIRSize++;
+                    e.transitionHIRResidentToLIRResident();
+                    stack.put(e.hashCode(), e);
+                } else {
+                    if (queue.size()< hirSizeLimit) {
+                        queue.addLast(e);
+                    } else {
+                        Set<HashEntry<K, V>> evicted = new HashSet<HashEntry<K, V>>();
+                        boolean inStack = stack.containsKey(e.hashCode());
+                        
+                        HashEntry<K, V> first = queue.removeFirst();
+                        assert first.recency() == Recency.HIR_RESIDENT;
+                        first.transitionHIRResidentToHIRNonResident();
+                        evicted.add(first);
+                        stack.put(e.hashCode(), e);
+                        
+                        if (inStack) {
+                            e.transitionHIRResidentToLIRResident();
+                            switchBottomostLIRtoHIRAndPrune(evicted);
+                        } else {
+                            queue.addLast(e);
+                        }
+                        removeFromSegment(evicted);
+                    }
+                }
+            }
+            
+            private void removeFromSegment(Set<HashEntry<K, V>> evicted) {
+                for (HashEntry<K, V> e : evicted) {
+                    remove(e.key,e.hash,null);
+                }
+            }
+
+            private void switchBottomostLIRtoHIRAndPrune(Set<HashEntry<K,V>> evicted) {
+                boolean seenFirstLIR = false;
+                for(Iterator<HashEntry<K, V>> i = stack.values().iterator();i.hasNext();) {
+                    HashEntry<K,V> next = i.next();
+                    if(next.recency() == Recency.LIR_RESIDENT) {
+                        if(!seenFirstLIR) {
+                            seenFirstLIR = true;
+                            i.remove();
+                            next.transitionLIRResidentToHIRResident();
+                            queue.addLast(next);
+                        } else {
+                            break;
+                        }                          
+                    } else {
+                        i.remove();
+                        evicted.add(next);
+                    }
+                }
+            }
+
+            /*
+             * Invoked without holding a lock on Segment
+             */
+            @Override
+            public boolean onEntryHit(HashEntry<K, V> e) {
+                accessQueue.add(e);
+                return accessQueueSize.incrementAndGet() >= maxBatchQueueSize * batchThresholdFactor;
+            }
+
+            /*
+             * Invoked without holding a lock on Segment
+             */
+            @Override
+            public boolean thresholdExpired() {
+                return accessQueueSize.get() >= maxBatchQueueSize;
+            }
+
+            @Override
+            public void onEntryRemove(HashEntry<K, V> e) {
+                HashEntry<K, V> removed = stack.remove(e.hashCode());
+                if(removed != null && removed.recency()==Recency.LIR_RESIDENT) {
+                    currentLIRSize--;
+                }
+                queue.remove(e);                                  
+                if (accessQueue.remove(e)) {
+                    accessQueueSize.decrementAndGet();
+                }
+            }
+
+            @Override
+            public void clear() {
+                stack.clear();
+                accessQueue.clear();
+                accessQueueSize.set(0);
+            }
+
+            @Override
+            public Eviction strategy() {
+                return Eviction.LIRS;
+            }
+        }
     }
 
     /* ---------------- Public operations -------------- */
@@ -853,12 +1073,19 @@
      * @param concurrencyLevel
      *            the estimated number of concurrently updating threads. The implementation performs
      *            internal sizing to try to accommodate this many threads.
+     *            
+     * @param evictionStrategy
+     *            the algorithm used to evict elements from this map
+     *            
+     * @param evictionListener
+     *            the evicton listener callback to be notified about evicted elements
+     *            
      * @throws IllegalArgumentException
      *             if the initial capacity is negative or the load factor or concurrencyLevel are
      *             nonpositive.
      */
     public BufferedConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel,
-                    EvictionStrategy evictionStrategy, EvictionListener<K, V> evictionListener) {
+                    Eviction evictionStrategy, EvictionListener<K, V> evictionListener) {
         if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
             throw new IllegalArgumentException();
 
@@ -890,7 +1117,7 @@
     }
 
     public BufferedConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
-        this(initialCapacity, loadFactor, concurrencyLevel, EvictionStrategy.LRU, null);
+        this(initialCapacity, loadFactor, concurrencyLevel, Eviction.LRU, null);
     }
 
     /**

Modified: trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java	2010-02-03 17:38:53 UTC (rev 1447)
+++ trunk/core/src/test/java/org/infinispan/stress/MapStressTest.java	2010-02-03 18:52:52 UTC (rev 1448)
@@ -12,10 +12,8 @@
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.util.concurrent.BufferedConcurrentHashMap;
+import org.infinispan.util.concurrent.BufferedConcurrentHashMap.Eviction;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -30,9 +28,9 @@
     volatile CountDownLatch latch;
     final int MAP_CAPACITY = 512;
     final float MAP_LOAD_FACTOR = 0.75f;
-    final int CONCURRENCY = 16;
+    final int CONCURRENCY = 32;
     
-    final int RUN_TIME_MILLIS = 10 * 1000; // 10 sec
+    final int RUN_TIME_MILLIS = 5 * 1000; // 10 sec
     final int NUM_KEYS = 50000;
     final int LOOP_FACTOR=5;
     
@@ -57,9 +55,13 @@
         doTest(new ConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY));
     }
    
-    public void testBufferedConcurrentHashMap() throws Exception {
-        doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, EvictionStrategy.LRU,null));
+    public void testBufferedConcurrentHashMapLRU() throws Exception {
+        doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, Eviction.LRU,null));
     }
+    
+    public void testBufferedConcurrentHashMapLIRS() throws Exception {
+        doTest(new BufferedConcurrentHashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR, CONCURRENCY, Eviction.LIRS,null));
+    }
 
     public void testHashMap() throws Exception {
         doTest(Collections.synchronizedMap(new HashMap<Integer, Integer>(MAP_CAPACITY, MAP_LOAD_FACTOR)));
@@ -74,7 +76,6 @@
 
         latch = new CountDownLatch(1);
         final Map<String, String> perf = new ConcurrentSkipListMap<String, String>();
-        final AtomicBoolean run = new AtomicBoolean(true);
         List<Thread> threads = new LinkedList<Thread>();
 
         for (int i = 0; i < numReaders; i++) {
@@ -83,7 +84,7 @@
                     waitForStart();
                     long start = System.nanoTime();
                     int runs = 0;
-                    while (run.get() && runs < readOps.size()) {
+                    while (runs < readOps.size()) {
                         map.get(readOps.get(runs));
                         runs++;
                     }
@@ -100,7 +101,7 @@
                     waitForStart();
                     long start = System.nanoTime();
                     int runs = 0;
-                    while (run.get() && runs < writeOps.size()) {
+                    while (runs < writeOps.size()) {
                         map.put(writeOps.get(runs),runs);
                         runs++;
                     }
@@ -117,7 +118,7 @@
                     waitForStart();
                     long start = System.nanoTime();
                     int runs = 0;
-                    while (run.get() && runs < removeOps.size()) {
+                    while (runs < removeOps.size()) {
                         map.remove(removeOps.get(runs));
                         runs++;
                     }
@@ -134,7 +135,6 @@
 
         // wait some time
         Thread.sleep(RUN_TIME_MILLIS);
-        run.set(false);
         for (Thread t : threads)
             t.join();
         



More information about the infinispan-commits mailing list