[infinispan-commits] Infinispan SVN: r2355 - in branches/4.2.x/core/src: test/java/org/infinispan/manager and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Sep 13 12:34:10 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-09-13 12:34:10 -0400 (Mon, 13 Sep 2010)
New Revision: 2355
Added:
branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
Log:
[ISPN-635] (NPE on acquire lock LockManagerImpl.lockAndRecord) Enable getCache() to be called concurrently in a safe way.
Modified: branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2010-09-13 16:33:33 UTC (rev 2354)
+++ branches/4.2.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java 2010-09-13 16:34:10 UTC (rev 2355)
@@ -21,7 +21,9 @@
*/
package org.infinispan.manager;
+import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
+import org.infinispan.CacheException;
import org.infinispan.Version;
import org.infinispan.config.Configuration;
import org.infinispan.config.ConfigurationException;
@@ -44,6 +46,7 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.util.Immutables;
+import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.rhq.helpers.pluginAnnotations.agent.DataType;
@@ -64,6 +67,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
/**
* A <tt>CacheManager</tt> is the primary mechanism for retrieving a {@link Cache} instance, and is often used as a
* starting point to using the {@link Cache}.
@@ -113,6 +118,7 @@
private final ConcurrentMap<String, Cache> caches = new ConcurrentHashMap<String, Cache>();
private final ConcurrentMap<String, Configuration> configurationOverrides = new ConcurrentHashMap<String, Configuration>();
private final GlobalComponentRegistry globalComponentRegistry;
+ private final ReentrantPerEntryLockContainer cacheNameLockContainer;
/**
* Constructs and starts a default instance of the CacheManager, using configuration defaults. See {@link Configuration}
@@ -200,7 +206,8 @@
this.globalConfiguration.accept(new ConfigurationValidatingVisitor());
this.defaultConfiguration = defaultConfiguration == null ? new Configuration() : defaultConfiguration.clone();
this.defaultConfiguration.accept(new ConfigurationValidatingVisitor());
- globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
+ this.globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
+ this.cacheNameLockContainer = new ReentrantPerEntryLockContainer(this.defaultConfiguration.getConcurrencyLevel());
if (start)
start();
}
@@ -239,6 +246,7 @@
configurationOverrides.put(entry.getKey(), c);
}
globalComponentRegistry = new GlobalComponentRegistry(globalConfiguration, this);
+ cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());
} catch (RuntimeException re) {
throw new ConfigurationException(re);
}
@@ -280,6 +288,7 @@
configurationOverrides.put(entry.getKey(), c);
}
globalComponentRegistry = new GlobalComponentRegistry(globalConfiguration, this);
+ cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());
} catch (ConfigurationException ce) {
throw ce;
} catch (RuntimeException re) {
@@ -329,7 +338,7 @@
}
globalComponentRegistry = new GlobalComponentRegistry(this.globalConfiguration, this);
-
+ cacheNameLockContainer = new ReentrantPerEntryLockContainer(defaultConfiguration.getConcurrencyLevel());
} catch (RuntimeException re) {
throw new ConfigurationException(re);
}
@@ -406,10 +415,25 @@
if (cacheName == null)
throw new NullPointerException("Null arguments not allowed");
- if (caches.containsKey(cacheName))
- return caches.get(cacheName);
+ Cache<K, V> cache = caches.get(cacheName);
+ if (cache != null && cache.getStatus() == ComponentStatus.RUNNING)
+ return cache;
- return createCache(cacheName);
+ boolean acquired = false;
+ try {
+ if (cacheNameLockContainer.acquireLock(cacheName, defaultConfiguration.getLockAcquisitionTimeout(), MILLISECONDS) != null) {
+ acquired = true;
+ return createCache(cacheName);
+ } else {
+ throw new CacheException("Unable to acquire lock on cache with name " + cacheName);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new CacheException("Interrupted while trying to get lock on cache with cache name " + cacheName, e);
+ } finally {
+ if (acquired)
+ cacheNameLockContainer.releaseLock(cacheName);
+ }
}
public String getClusterName() {
@@ -434,7 +458,12 @@
return t != null && t.isCoordinator();
}
+ @GuardedBy("Cache name lock container keeps a lock per cache name which guards this method")
private Cache createCache(String cacheName) {
+ Cache existingCache = caches.get(cacheName);
+ if (existingCache != null)
+ return existingCache;
+
Configuration c;
if (cacheName.equals(DEFAULT_CACHE_NAME) || !configurationOverrides.containsKey(cacheName))
c = defaultConfiguration.clone();
@@ -444,13 +473,12 @@
c.setGlobalConfiguration(globalConfiguration);
c.assertValid();
Cache cache = new InternalCacheFactory().createCache(c, globalComponentRegistry, cacheName);
- Cache other = caches.putIfAbsent(cacheName, cache);
- if (other == null) {
- cache.start();
- return cache;
- } else {
- return other;
- }
+ existingCache = caches.put(cacheName, cache);
+ if (existingCache != null)
+ throw new IllegalStateException("attempt to initialize the cache twice");
+
+ cache.start();
+ return cache;
}
public void start() {
Added: branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/manager/ConcurrentCacheManagerTest.java 2010-09-13 16:34:10 UTC (rev 2355)
@@ -0,0 +1,57 @@
+package org.infinispan.manager;
+
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Test that verifies that CacheContainer.getCache() can be called concurrently.
+ *
+ * @author Galder Zamarreño
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "ConcurrentCacheManagerTest")
+public class ConcurrentCacheManagerTest extends SingleCacheManagerTest {
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ return new DefaultCacheManager();
+ }
+
+ public void testConcurrentGetCacheCalls() throws Exception {
+ int numThreads = 25;
+ final CyclicBarrier barrier = new CyclicBarrier(numThreads +1);
+ List<Future<Void>> futures = new ArrayList<Future<Void>>(numThreads);
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ for (int i = 0; i < numThreads; i++) {
+ log.debug("Schedule execution");
+ Future<Void> future = executorService.submit(new Callable<Void>(){
+ @Override
+ public Void call() throws Exception {
+ try {
+ barrier.await();
+ cacheManager.getCache("blahblah").put("a", "b");
+ return null;
+ } finally {
+ log.debug("Wait for all execution paths to finish");
+ barrier.await();
+ }
+ }
+ });
+ futures.add(future);
+ }
+ barrier.await(); // wait for all threads to be ready
+ barrier.await(); // wait for all threads to finish
+
+ log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported");
+ for (Future<Void> future : futures) future.get();
+ }
+}
More information about the infinispan-commits
mailing list