[infinispan-commits] Infinispan SVN: r2355 - in branches/4.2.x/core/src: test/java/org/infinispan/manager and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Sep 13 12:34:10 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-09-13 12:34:10 -0400 (Mon, 13 Sep 2010)
New Revision: 2355

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
Log:
[ISPN-635] (NPE on acquire lock LockManagerImpl.lockAndRecord) Enable getCache() to be called concurrently in a safe way.

Modified: branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2010-09-13 16:33:33 UTC (rev 2354)
+++ branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2010-09-13 16:34:10 UTC (rev 2355)
@@ -21,7 +21,9 @@
  */
 package org.infinispan.manager;
 
+import net.jcip.annotations.GuardedBy;
 import org.infinispan.Cache;
+import org.infinispan.CacheException;
 import org.infinispan.Version;
 import org.infinispan.config.Configuration;
 import org.infinispan.config.ConfigurationException;
@@ -44,6 +46,7 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.util.Immutables;
+import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.rhq.helpers.pluginAnnotations.agent.DataType;
@@ -64,6 +67,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
 /**
  * A <tt>CacheManager</tt> is the primary mechanism for retrieving a {@link Cache} instance, and is often used as a
  * starting point to using the {@link Cache}.
@@ -113,6 +118,7 @@
    private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
    private final ConcurrentMap<String, Configuration> configurationOverrides = new ConcurrentHashMap<String, Configuration>();
    private final GlobalComponentRegistry globalComponentRegistry;
+   private final ReentrantPerEntryLockContainer cacheNameLockContainer;
 
    /**
     * Constructs and starts a default instance of the CacheManager, using configuration defaults.  See {@link Configuration}
@@ -200,7 +206,8 @@
       this.globalConfiguration.accept(new ConfigurationValidatingVisitor());
       this.defaultConfiguration = defaultConfiguration == null ? new Configuration() : defaultConfiguration.clone();
       this.defaultConfiguration.accept(new ConfigurationValidatingVisitor());
-      globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
+      this.globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
+      this.cacheNameLockContainer = new ReentrantPerEntryLockContainer(this.defaultConfiguration.getConcurrencyLevel());
       if (start)
          start();
    }
@@ -239,6 +246,7 @@
             configurationOverrides.put(entry.getKey(), c);
          }
          globalComponentRegistry = new GlobalComponentRegistry(globalConfiguration, this);
+         cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());         
       } catch (RuntimeException re) {
          throw new ConfigurationException(re);
       }
@@ -280,6 +288,7 @@
             configurationOverrides.put(entry.getKey(), c);
          }
          globalComponentRegistry = new GlobalComponentRegistry(globalConfiguration, this);
+         cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());         
       } catch (ConfigurationException ce) {
          throw ce;
       } catch (RuntimeException re) {
@@ -329,7 +338,7 @@
          }
 
          globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
-
+         cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());
       } catch (RuntimeException re) {
          throw new ConfigurationException(re);
       }
@@ -406,10 +415,25 @@
       if (cacheName == null)
          throw new NullPointerException("Null arguments not allowed");
 
-      if (caches.containsKey(cacheName))
-         return caches.get(cacheName);
+      Cache<K, V> cache = caches.get(cacheName);
+      if (cache != null && cache.getStatus() == ComponentStatus.RUNNING)
+         return cache;
 
-      return createCache(cacheName);
+      boolean acquired = false;
+      try {
+         if (cacheNameLockContainer.acquireLock(cacheName, defaultConfiguration.getLockAcquisitionTimeout(), MILLISECONDS) != null) {
+            acquired = true;
+            return createCache(cacheName);
+         } else {
+            throw new CacheException("Unable to acquire lock on cache with name " + cacheName);
+         }
+      } catch (InterruptedException e) {
+         Thread.currentThread().interrupt();
+         throw new CacheException("Interrupted while trying to get lock on cache with cache name " + cacheName, e);
+      } finally {
+         if (acquired)
+            cacheNameLockContainer.releaseLock(cacheName);
+      }
    }
 
    public String getClusterName() {
@@ -434,7 +458,12 @@
       return t != null && t.isCoordinator();
    }
 
+   @GuardedBy("Cache name lock container keeps a lock per cache name which guards this method")
    private Cache createCache(String cacheName) {
+      Cache existingCache = caches.get(cacheName);
+      if (existingCache != null)
+         return existingCache;
+
       Configuration c;
       if (cacheName.equals(DEFAULT_CACHE_NAME) || !configurationOverrides.containsKey(cacheName))
          c = defaultConfiguration.clone();
@@ -444,13 +473,12 @@
       c.setGlobalConfiguration(globalConfiguration);
       c.assertValid();
       Cache cache = new InternalCacheFactory().createCache(c, globalComponentRegistry, cacheName);
-      Cache other = caches.putIfAbsent(cacheName, cache);
-      if (other == null) {
-         cache.start();
-         return cache;
-      } else {
-         return other;
-      }
+      existingCache = caches.put(cacheName, cache);
+      if (existingCache != null)
+         throw new IllegalStateException("attempt to initialize the cache twice");
+
+      cache.start();
+      return cache;
    }
 
    public void start() {

Added: branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java	2010-09-13 16:34:10 UTC (rev 2355)
@@ -0,0 +1,57 @@
+package org.infinispan.manager;
+
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Test that verifies that CacheContainer.getCache() can be called concurrently.
+ *
+ * @author Galder Zamarreño
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "ConcurrentCacheManagerTest")
+public class ConcurrentCacheManagerTest extends SingleCacheManagerTest {
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      return new DefaultCacheManager();
+   }
+
+   public void testConcurrentGetCacheCalls() throws Exception {
+      int numThreads = 25;
+      final CyclicBarrier barrier = new CyclicBarrier(numThreads +1);
+      List<Future<Void>> futures = new ArrayList<Future<Void>>(numThreads);
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      for (int i = 0; i < numThreads; i++) {
+         log.debug("Schedule execution");
+         Future<Void> future = executorService.submit(new Callable<Void>(){
+            @Override
+            public Void call() throws Exception {
+               try {
+                  barrier.await();
+                  cacheManager.getCache("blahblah").put("a", "b");
+                  return null;
+               } finally {
+                  log.debug("Wait for all execution paths to finish");
+                  barrier.await();
+               }
+            }
+         });
+         futures.add(future);
+      }
+      barrier.await(); // wait for all threads to be ready
+      barrier.await(); // wait for all threads to finish
+
+      log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported");
+      for (Future<Void> future : futures) future.get();
+   }
+}



More information about the infinispan-commits mailing list