Author: manik.surtani(a)jboss.com
Date: 2009-03-07 07:41:42 -0500 (Sat, 07 Mar 2009)
New Revision: 7875
Added:
core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
Log:
Fixed load and concurrent evict bug, added test
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-03-07
12:27:52 UTC (rev 7874)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-03-07
12:41:42 UTC (rev 7875)
@@ -105,31 +105,31 @@
}
private void loadIfNeeded(InvocationContext ctx, Object key) throws Throwable {
+ // first check if the container contains the key we need. Try and load this into
the context.
+ MVCCEntry e = entryFactory.wrapEntryForReading(ctx, key);
+ if (e == null || e.isNullEntry()) {
- // TODO this needs re-ordering. Checking the loader first is too expensive for
cache hits if we already have this in memory.
- // It is better to check in memory first, but we must also guard against concurrent
eviction and cache lookups
- // since the loader may skip loading for a key in memory but the key is evicted
before it is read by the reader,
- // returning incorrect results. TODO: Write a test for this.
- // A potential solution is to test for the entry in memory, and then store it in
context if available. Else, check
- // if it is in the loader.
+ // we *may* need to load this.
+ if (!loader.containsKey(key)) {
+ log.trace("No need to load. Key doesn't exist in the
loader.");
+ return;
+ }
- if (!loader.containsKey(key)) {
- log.trace("No need to load. Key doesn't exist in the loader.");
- return;
- }
+ // Obtain a temporary lock to verify the key is not being concurrently added
+ boolean keyLocked = entryFactory.acquireLock(ctx, key);
- // Obtain a temporary lock to verify the key is not being concurrently added
- boolean keyLocked = entryFactory.acquireLock(ctx, key);
- if (dataContainer.containsKey(key)) {
- if (keyLocked) entryFactory.releaseLock(key);
- log.trace("No need to load. Key exists in the data container.");
- return;
+ // check again, in case there is a concurrent addition
+ if (dataContainer.containsKey(key)) {
+ if (keyLocked) entryFactory.releaseLock(key);
+ log.trace("No need to load. Key exists in the data container.");
+ return;
+ }
+
+ // Reuse the lock and create a new entry for loading
+ MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false,
keyLocked);
+ n = loadEntry(ctx, key, n);
+ ctx.setContainsModifications(true);
}
-
- // Reuse the lock and create a new entry for loading
- MVCCEntry n = entryFactory.wrapEntryForWriting(ctx, key, true, false, keyLocked);
- n = loadEntry(ctx, key, n);
- ctx.setContainsModifications(true);
}
/**
Added:
core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/ConcurrentLoadAndEvictTest.java 2009-03-07
12:41:42 UTC (rev 7875)
@@ -0,0 +1,118 @@
+package org.horizon.loader;
+
+import org.horizon.commands.read.GetKeyValueCommand;
+import org.horizon.commands.write.EvictCommand;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.config.Configuration;
+import org.horizon.config.CustomInterceptorConfig;
+import org.horizon.container.DataContainer;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.CacheLoaderInterceptor;
+import org.horizon.interceptors.base.CommandInterceptor;
+import org.horizon.loader.dummy.DummyInMemoryCacheStore;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.test.SingleCacheManagerTest;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests a thread going past the cache loader interceptor and the interceptor deciding
that loading is not necessary,
+ * then another thread rushing ahead and evicting the entry from memory.
+ *
+ * @author Manik Surtani
+ */
+@Test(groups = "functional", testName =
"loader.ConcurrentLoadAndEvictTest")
+public class ConcurrentLoadAndEvictTest extends SingleCacheManagerTest {
+ SlowDownInterceptor sdi;
+
+ protected CacheManager createCacheManager() throws Exception {
+ Configuration config = new Configuration();
+ // we need a loader:
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ config.setCacheLoaderManagerConfig(clmc);
+ clmc.addCacheLoaderConfig(new DummyInMemoryCacheStore.Cfg());
+
+ // we also need a custom interceptor to intercept get() calls after the CLI, to
slow it down so an evict goes
+ // through first
+
+ sdi = new SlowDownInterceptor();
+ CustomInterceptorConfig cic = new CustomInterceptorConfig(sdi);
+ cic.setAfterInterceptor(CacheLoaderInterceptor.class);
+ config.setCustomInterceptors(Collections.singletonList(cic));
+ return new DefaultCacheManager(config);
+ }
+
+ public void testEvictBeforeRead() throws CacheLoaderException, ExecutionException,
InterruptedException {
+ cache = cacheManager.getCache();
+ cache.put("a", "b");
+ assert cache.get("a").equals("b");
+ CacheLoader cl = TestingUtil.getCacheLoader(cache);
+ assert cl != null;
+ StoredEntry se = cl.load("a");
+ assert se != null;
+ assert se.getValue().equals("b");
+
+ // now attempt a concurrent get and evict.
+ ExecutorService e = Executors.newFixedThreadPool(1);
+ sdi.enabled = true;
+
+ // call the get
+ Future<String> future = e.submit(new Callable<String>() {
+ public String call() throws Exception {
+ return (String) cache.get("a");
+ }
+ });
+
+ // now run the evict.
+ cache.evict("a");
+
+ // make sure the get call, which would have gone past the cache loader interceptor
first, gets the correct value.
+ assert future.get().equals("b");
+
+ // disable the SlowDownInterceptor
+ sdi.enabled = false;
+
+ // and check that the key actually has been evicted
+ assert !TestingUtil.extractComponent(cache,
DataContainer.class).containsKey("a");
+ }
+
+ public static class SlowDownInterceptor extends CommandInterceptor {
+ volatile boolean enabled = false;
+ CountDownLatch getLatch = new CountDownLatch(1);
+ CountDownLatch evictLatch = new CountDownLatch(1);
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand
command) throws Throwable {
+ if (enabled) {
+ getLatch.countDown();
+ if (!evictLatch.await(60000, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Didn't see evict after 60
seconds!");
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws
Throwable {
+ if (enabled) {
+ if (!getLatch.await(60000, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Didn't see get after 60
seconds!");
+ }
+ try {
+ return invokeNextInterceptor(ctx, command);
+ } finally {
+ if (enabled) evictLatch.countDown();
+ }
+ }
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07
12:27:52 UTC (rev 7874)
+++ core/branches/flat/src/test/java/org/horizon/profiling/ProfileTest.java 2009-03-07
12:41:42 UTC (rev 7875)
@@ -21,7 +21,7 @@
*
* @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
*/
-@Test(groups = "profiling", enabled = false, testName =
"profiling.ProfileTest")
+@Test(groups = "profiling", enabled = true, testName =
"profiling.ProfileTest")
public class ProfileTest extends AbstractProfileTest {
/*
Test configuration options
@@ -35,7 +35,7 @@
private List<Object> keys = new ArrayList<Object>(MAX_OVERALL_KEYS);
- @Test(enabled = false)
+ @Test(enabled = true)
public void testLocalMode() throws Exception {
runCompleteTest();
}
Modified: core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-03-07 12:27:52
UTC (rev 7874)
+++ core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-03-07 12:41:42
UTC (rev 7875)
@@ -10,14 +10,16 @@
import org.horizon.AdvancedCache;
import org.horizon.Cache;
import org.horizon.CacheDelegate;
-import org.horizon.config.GlobalConfiguration;
import org.horizon.commands.CommandsFactory;
import org.horizon.commands.VisitableCommand;
+import org.horizon.config.GlobalConfiguration;
import org.horizon.factories.ComponentRegistry;
import org.horizon.factories.GlobalComponentRegistry;
import org.horizon.interceptors.InterceptorChain;
import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.lifecycle.ComponentStatus;
+import org.horizon.loader.CacheLoader;
+import org.horizon.loader.CacheLoaderManager;
import org.horizon.lock.LockManager;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
@@ -31,8 +33,8 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Properties;
import java.util.Random;
-import java.util.Properties;
public class TestingUtil {
private static Random random = new Random();
@@ -125,11 +127,11 @@
}
/**
- * Waits for the given memebrs to be removed from the cluster. The difference between
this and
- * {@link #blockUntilViewsReceived(long, org.horizon.manager.CacheManager[])}
methods(s) is that it does not barf if
- * more than expected memebers is in the cluster - this is because we expect to start
with a grater number fo memebers
- * than we eventually expect. It will barf though, if the number of members is not the
one expected but only after the
- * timeout expieres.
+ * Waits for the given memebrs to be removed from the cluster. The difference between
this and {@link
+ * #blockUntilViewsReceived(long, org.horizon.manager.CacheManager[])} methods(s) is
that it does not barf if more
+ * than expected memebers is in the cluster - this is because we expect to start with
a grater number fo memebers
+ * than we eventually expect. It will barf though, if the number of members is not the
one expected but only after
+ * the timeout expieres.
*/
public static void blockForMemberToFail(long timeout, CacheManager... cacheManagers)
{
blockUntilViewsReceived(timeout, false, cacheManagers);
@@ -623,7 +625,7 @@
Properties newTransportProps = new Properties();
newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING,
JGroupsConfigBuilder.getJGroupsConfig());
globalConfiguration.setTransportProperties(newTransportProps);
- return new DefaultCacheManager(globalConfiguration);
+ return new DefaultCacheManager(globalConfiguration);
}
public static CacheManager createLocalCacheManager() {
@@ -637,4 +639,13 @@
globalConfiguration.setTransportProperties(newTransportProps);
return new DefaultCacheManager(globalConfiguration);
}
+
+ public static CacheLoader getCacheLoader(Cache cache) {
+ CacheLoaderManager clm = extractComponent(cache, CacheLoaderManager.class);
+ if (clm != null && clm.isEnabled()) {
+ return clm.getCacheLoader();
+ } else {
+ return null;
+ }
+ }
}