[infinispan-commits] Infinispan SVN: r999 - in trunk/core/src/test/java/org/infinispan: stress and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Oct 23 18:00:51 EDT 2009


Author: sannegrinovero
Date: 2009-10-23 18:00:51 -0400 (Fri, 23 Oct 2009)
New Revision: 999

Added:
   trunk/core/src/test/java/org/infinispan/stress/
   trunk/core/src/test/java/org/infinispan/stress/PutIfAbsentStressTest.java
Log:
[ISPN-236] (cache.putIfAbsent() is not atomic) : adding a functional stress test.

Added: trunk/core/src/test/java/org/infinispan/stress/PutIfAbsentStressTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/stress/PutIfAbsentStressTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/stress/PutIfAbsentStressTest.java	2009-10-23 22:00:51 UTC (rev 999)
@@ -0,0 +1,261 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.stress;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies the atomic semantic of Infinispan's implementations of
+ * java.util.concurrent.ConcurrentMap<K, V>.putIfAbsent(K key, V value); which is an interesting
+ * concurrent locking case.
+ * 
+ * @since 4.0
+ * @see java.util.concurrent.ConcurrentMap#putIfAbsent(Object, Object)
+ * @author Sanne Grinovero
+ */
+ at Test(groups = "stress", testName = "atomic.PutIfAbsentStressTest")
+public class PutIfAbsentStressTest {
+
+   private static final int NODES_NUM = 5;
+   private static final int THREAD_PER_NODE = 20;
+   private static final long STRESS_TIME_MINUTES = 1;
+   private static final long SLEEP_MILLISECONDS = 50;
+   private static final String SHARED_KEY = "thisIsTheKeyForConcurrentAccess";
+
+   /**
+    * Purpose is not testing JDK's ConcurrentHashMap but ensuring the test is correct. It's also
+    * interesting to compare performance.
+    */
+   @Test
+   protected void testonConcurrentHashMap() throws Exception {
+      ConcurrentMap<String, String> map = new ConcurrentHashMap<String, String>();
+      testConcurrentLocking(map);
+   }
+
+   /**
+    * Testing putIfAbsent's behaviour on a Local cache.
+    */
+   @Test
+   protected void testonInfinispanLocal() throws Exception {
+      CacheManager cm = TestCacheManagerFactory.createLocalCacheManager(false);
+      ConcurrentMap<String, String> map = cm.getCache();
+      try {
+         testConcurrentLocking(map);
+      } finally {
+         TestingUtil.clearContent(cm);
+      }
+   }
+
+   /**
+    * Testing putIfAbsent's behaviour in DIST_SYNC cache.
+    */
+   @Test
+   protected void testonInfinispanDIST() throws Exception {
+      Configuration c = new Configuration();
+      c.setCacheMode(Configuration.CacheMode.DIST_SYNC);
+      testConcurrentLockingOnMultipleManagers(c);
+   }
+
+   /**
+    * Testing putIfAbsent's behaviour in REPL_SYNC cache.
+    */
+   @Test
+   protected void testonInfinispanREPL() throws Exception {
+      Configuration c = new Configuration();
+      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      testConcurrentLockingOnMultipleManagers(c);
+   }
+
+   /**
+    * Adapter to run the test on any configuration
+    */
+   private void testConcurrentLockingOnMultipleManagers(Configuration cfg) throws IOException, InterruptedException {
+      List<CacheManager> cacheManagers = new ArrayList<CacheManager>(NODES_NUM);
+      List<Cache<String, String>> caches = new ArrayList<Cache<String, String>>();
+      List<ConcurrentMap<String, String>> maps = new ArrayList<ConcurrentMap<String, String>>(NODES_NUM
+               * THREAD_PER_NODE);
+      for (int nodeNum = 0; nodeNum < NODES_NUM; nodeNum++) {
+         CacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+         cacheManagers.add(cm);
+         Cache<String, String> cache = cm.getCache();
+         caches.add(cache);
+         for (int threadNum = 0; threadNum < THREAD_PER_NODE; threadNum++) {
+            maps.add(cache);
+         }
+      }
+      TestingUtil.blockUntilViewsReceived(10000, caches);
+      try {
+         testConcurrentLocking(maps);
+      } finally {
+         for (CacheManager cm : cacheManagers) {
+            try {
+               TestingUtil.clearContent(cm);
+            } catch (Exception e) {
+               // try cleaning up the other cacheManagers too
+            }
+         }
+      }
+   }
+
+   /**
+    * Adapter for tests sharing a single Cache instance
+    */
+   private void testConcurrentLocking(ConcurrentMap<String, String> map) throws IOException, InterruptedException {
+      int size = NODES_NUM * THREAD_PER_NODE;
+      List<ConcurrentMap<String, String>> maps = new ArrayList<ConcurrentMap<String, String>>(size);
+      for (int i = 0; i < size; i++) {
+         maps.add(map);
+      }
+      testConcurrentLocking(maps);
+   }
+
+   /**
+    * Drives the actual test on an Executor and verifies the result
+    * 
+    * @param maps the caches to be tested
+    * @throws IOException
+    * @throws InterruptedException
+    */
+   private void testConcurrentLocking(List<ConcurrentMap<String, String>> maps) throws IOException,
+            InterruptedException {
+      SharedStats stats = new SharedStats();
+      ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(NODES_NUM);
+      List<StressingThread> threads = new ArrayList<StressingThread>();
+      for (ConcurrentMap<String, String> map : maps) {
+         StressingThread thread = new StressingThread(stats, map);
+         threads.add(thread);
+         executor.execute(thread);
+      }
+      executor.shutdown();
+      Thread.sleep(5000);
+      int putsAfter5Seconds = stats.succesfullPutsCounter.get();
+      System.out.println("\nSituation after 5 seconds:");
+      System.out.println(stats.toString());
+      executor.awaitTermination(STRESS_TIME_MINUTES, TimeUnit.MINUTES);
+      stats.globalQuit = true;
+      executor.awaitTermination(10, TimeUnit.SECONDS); // give some time to awake and quit
+      executor.shutdownNow();
+      System.out.println("\nFinal situation:");
+      System.out.println(stats.toString());
+      assert !stats.seenFailures : "at least one thread has seen unexpected state";
+      assert stats.succesfullPutsCounter.get() > 0 : "the lock should have been taken at least once";
+      assert stats.succesfullPutsCounter.get() > putsAfter5Seconds : "the lock count didn't improve since the first 5 seconds. Deadlock?";
+      assert stats.succesfullPutsCounter.get() == stats.lockReleasedCounter.get() : "there's a mismatch in acquires and releases count";
+      assert stats.lockOwnersCounter.get() == 0 : "the lock is still held at test finish";
+   }
+
+   private static class StressingThread implements Runnable {
+
+      private final SharedStats stats;
+      private final ConcurrentMap<String, String> cache;
+
+      public StressingThread(SharedStats stats, ConcurrentMap<String, String> cache) {
+         this.stats = stats;
+         this.cache = cache;
+      }
+
+      @Override
+      public void run() {
+         while (!(stats.seenFailures || stats.globalQuit || Thread.interrupted())) {
+            try {
+               doCycle();
+            } catch (IOException e) {
+               checkIsTrue(false, e.getMessage());
+            }
+         }
+      }
+
+      private void doCycle() throws IOException {
+         String beforePut = cache.putIfAbsent(SHARED_KEY, SHARED_KEY);
+         if (beforePut != null) {
+            stats.canceledPutsCounter.incrementAndGet();
+            sleep();
+         } else {
+            boolean lockIsFine = stats.lockOwnersCounter.compareAndSet(0, 1);
+            System.out.print("L");
+            stats.succesfullPutsCounter.incrementAndGet();
+            checkIsTrue(lockIsFine, "I got the lock, some other thread is owning the lock AS WELL.");
+            sleep();
+            lockIsFine = stats.lockOwnersCounter.compareAndSet(1, 0);
+            checkIsTrue(lockIsFine, "Some other thread changed the lock count while I was having it!");
+            System.out.print("R");
+            cache.remove(SHARED_KEY);
+            stats.lockReleasedCounter.incrementAndGet();
+         }
+      }
+
+      private void sleep() {
+         try {
+            Thread.sleep(SLEEP_MILLISECONDS);
+         } catch (InterruptedException e) {
+            // no-op: waking up is good enough
+         }
+      }
+
+      private void checkIsTrue(boolean assertion, String message) {
+         if (assertion == false) {
+            stats.seenFailures = true;
+            System.out.println(message);
+         }
+      }
+
+   }
+
+   /**
+    * Common state to verify cache behaviour
+    */
+   public static class SharedStats {
+
+      final AtomicInteger canceledPutsCounter = new AtomicInteger(0);
+      final AtomicInteger succesfullPutsCounter = new AtomicInteger(0);
+      final AtomicInteger lockReleasedCounter = new AtomicInteger(0);
+      final AtomicInteger lockOwnersCounter = new AtomicInteger(0);
+      Throwable throwable = null;
+      volatile boolean globalQuit = false; // when it's true the threads quit
+      volatile boolean seenFailures = false; // set to true by a thread if it has experienced
+                                             // illegal state
+
+      public String toString() {
+         return "\n\tCanceled puts count:\t" + canceledPutsCounter.get() +
+                "\n\tSuccesfull puts count:\t" + succesfullPutsCounter.get() +
+                "\n\tRemoved count:\t" + lockReleasedCounter.get() +
+                "\n\tIllegal state detected:\t" + seenFailures;
+      }
+
+   }
+
+}


Property changes on: trunk/core/src/test/java/org/infinispan/stress/PutIfAbsentStressTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list