Author: galder.zamarreno(a)jboss.com
Date: 2010-01-20 05:11:53 -0500 (Wed, 20 Jan 2010)
New Revision: 8325
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/SingletonStoreCacheLoader.java
core/trunk/src/test/java/org/jboss/cache/loader/SingletonStoreCacheLoaderTest.java
core/trunk/src/test/java/org/jboss/cache/util/TestingUtil.java
Log:
[JBCACHE-1561] (SingletonStoreCacheLoader does not work with MVCC) Singleton store now
works with MVCC.
Modified: core/trunk/src/main/java/org/jboss/cache/loader/SingletonStoreCacheLoader.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/loader/SingletonStoreCacheLoader.java 2010-01-19
11:41:29 UTC (rev 8324)
+++
core/trunk/src/main/java/org/jboss/cache/loader/SingletonStoreCacheLoader.java 2010-01-20
10:11:53 UTC (rev 8325)
@@ -41,7 +41,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -242,23 +241,22 @@
*/
protected void pushState(NodeSPI node) throws Exception
{
- /* Put the node's data first */
- Set keys = node.getKeysDirect();
- Fqn fqn = node.getFqn();
-
- for (Object aKey : keys)
- {
- Object value = cache.get(fqn, aKey);
- put(fqn, aKey, value);
- }
-
/* Navigates to the children */
- Collection<NodeSPI> children = node.getChildrenDirect();
- for (NodeSPI aChildren : children)
+ Collection<NodeSPI> children = node.getChildren();
+ if (trace) log.trace("Children for " + node.getFqn() + " are "
+ children);
+ if (!children.isEmpty())
{
- //Map.Entry entry = (Map.Entry) aChildren;
- pushState(aChildren);
+ for (NodeSPI aChildren : children)
+ {
+ pushState(aChildren);
+ }
}
+
+ Map data = node.getData();
+ Fqn fqn = node.getFqn();
+ if (!data.isEmpty()) {
+ put(fqn, data);
+ }
}
/**
Modified:
core/trunk/src/test/java/org/jboss/cache/loader/SingletonStoreCacheLoaderTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/loader/SingletonStoreCacheLoaderTest.java 2010-01-19
11:41:29 UTC (rev 8324)
+++
core/trunk/src/test/java/org/jboss/cache/loader/SingletonStoreCacheLoaderTest.java 2010-01-20
10:11:53 UTC (rev 8325)
@@ -10,9 +10,10 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeSPI;
import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
import org.jboss.cache.config.Configuration.CacheMode;
-import org.jboss.cache.config.Configuration.NodeLockingScheme;
import org.jboss.cache.factories.UnitTestConfigurationFactory;
import org.jboss.cache.util.internals.ViewChangeListener;
import static org.testng.AssertJUnit.*;
@@ -20,6 +21,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -50,10 +53,6 @@
cache1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC),
false, getClass());
cache2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC),
false, getClass());
cache3 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC),
false, getClass());
-
- cache1.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
- cache2.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
- cache3.getConfiguration().setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
}
public void testPutCacheLoaderWithNoPush() throws Exception
@@ -77,27 +76,16 @@
assertTrue("/test2 should have been entered in cl1",
cl1.exists(fqn("/test2")));
assertTrue("/test3 should have been entered in cl1",
cl1.exists(fqn("/test3")));
- assertFalse("/test1 should not be in cl2",
cl2.exists(fqn("/test1")));
- assertFalse("/test2 should not be in cl2",
cl2.exists(fqn("/test2")));
- assertFalse("/test3 should not be in cl2",
cl2.exists(fqn("/test3")));
+ stopCache1(true);
- assertFalse("/test1 should not be in cl3",
cl3.exists(fqn("/test1")));
- assertFalse("/test2 should not be in cl3",
cl3.exists(fqn("/test2")));
- assertFalse("/test3 should not be in cl3",
cl3.exists(fqn("/test3")));
-
- stopCache1();
-
cache2.put(fqn("/test4"), "key", "value");
cache3.put(fqn("/test5"), "key", "value");
assertTrue("/test4 should have been entered in cl2",
cl2.exists(fqn("/test4")));
assertTrue("/test5 should have been entered in cl2",
cl2.exists(fqn("/test5")));
- assertFalse("/test4 should not be in cl3",
cl3.exists(fqn("/test4")));
- assertFalse("/test5 should not be in cl3",
cl3.exists(fqn("/test5")));
+ stopCache2(true);
- stopCache2();
-
cache3.put(fqn("/test6"), "key", "value");
assertTrue("/test5 should have been entered in cl3",
cl3.exists(Fqn.fromString("/test6")));
}
@@ -105,7 +93,7 @@
public void testPutCacheLoaderWithPush() throws Exception
{
initSingletonWithPushCache(cache1);
- initSingletonWithPushCache(cache2);
+ initSingletonWithWaitingPushCache(cache2);
initSingletonWithPushCache(cache3);
createCaches();
@@ -133,32 +121,39 @@
assertTrue(cl1.get(fqn("/e")).containsKey("e-key"));
assertTrue(cl1.get(fqn("/e/f/g")).containsKey("g-key"));
- assertFalse(cl2.exists(fqn("/a")));
- assertFalse(cl2.exists(fqn("/a")));
- assertFalse(cl2.exists(fqn("/a/b")));
- assertFalse(cl2.exists(fqn("/a/b")));
- assertFalse(cl2.exists(fqn("/a/b/c")));
- assertFalse(cl2.exists(fqn("/a/b/d")));
- assertFalse(cl2.exists(fqn("/e")));
- assertFalse(cl2.exists(fqn("/e/f/g")));
+ assertTrue(cl2.exists(fqn("/a")));
+ assertTrue(cl2.exists(fqn("/a")));
+ assertTrue(cl2.exists(fqn("/a/b")));
+ assertTrue(cl2.exists(fqn("/a/b")));
+ assertTrue(cl2.exists(fqn("/a/b/c")));
+ assertTrue(cl2.exists(fqn("/a/b/d")));
+ assertTrue(cl2.exists(fqn("/e")));
+ assertTrue(cl2.exists(fqn("/e/f/g")));
- assertFalse(cl3.exists(fqn("/a")));
- assertFalse(cl3.exists(fqn("/a")));
- assertFalse(cl3.exists(fqn("/a/b")));
- assertFalse(cl3.exists(fqn("/a/b")));
- assertFalse(cl3.exists(fqn("/a/b/c")));
- assertFalse(cl3.exists(fqn("/a/b/d")));
- assertFalse(cl3.exists(fqn("/e")));
- assertFalse(cl3.exists(fqn("/e/f/g")));
-
+ WaitForPushSingletonStoreCacheLoader scl2 = (WaitForPushSingletonStoreCacheLoader)
cache2.getCacheLoaderManager().getCacheLoader();
+ CountDownLatch startPushLatch = new CountDownLatch(1);
+ scl2.setStartPushLatch(startPushLatch);
+
ViewChangeListener viewChangeListener = new ViewChangeListener(cache2);
-
- stopCache1();
+ stopCache1(false);
+ cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache2.getInvocationContext().getOptionOverrides().setSuppressPersistence(true);
+ cache2.put(fqn("/e/i"), "i-key", "i-value");
+ startPushLatch.countDown();
viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
- SingletonStoreCacheLoader scl2 = (SingletonStoreCacheLoader)
cache2.getCacheLoaderManager().getCacheLoader();
waitForPushStateCompletion(scl2.getPushStateFuture());
+ assertTrue(cl2.exists(fqn("/a")));
+ assertTrue(cl2.exists(fqn("/a")));
+ assertTrue(cl2.exists(fqn("/a/b")));
+ assertTrue(cl2.exists(fqn("/a/b")));
+ assertTrue(cl2.exists(fqn("/a/b/c")));
+ assertTrue(cl2.exists(fqn("/a/b/d")));
+ assertTrue(cl2.exists(fqn("/e")));
+ assertTrue(cl2.exists(fqn("/e/f/g")));
+ assertTrue(cl2.exists(fqn("/e/i")));
+
assertTrue(cl2.get(fqn("/a")).containsKey("a-key"));
assertTrue(cl2.get(fqn("/a")).containsKey("aa-key"));
assertTrue(cl2.get(fqn("/a/b")).containsKey("b-key"));
@@ -167,6 +162,7 @@
assertTrue(cl2.get(fqn("/a/b/d")).containsKey("d-key"));
assertTrue(cl2.get(fqn("/e")).containsKey("e-key"));
assertTrue(cl2.get(fqn("/e/f/g")).containsKey("g-key"));
+ assertTrue(cl2.get(fqn("/e/i")).containsKey("i-key"));
cache2.put(fqn("/e/f/h"), "h-key", "h-value");
cache3.put(fqn("/i"), "i-key", "i-value");
@@ -174,19 +170,19 @@
assertTrue(cl2.get(fqn("/e/f/h")).containsKey("h-key"));
assertTrue(cl2.get(fqn("/i")).containsKey("i-key"));
- assertFalse(cl3.exists(fqn("/a")));
- assertFalse(cl3.exists(fqn("/a")));
- assertFalse(cl3.exists(fqn("/a/b")));
- assertFalse(cl3.exists(fqn("/a/b")));
- assertFalse(cl3.exists(fqn("/a/b/c")));
- assertFalse(cl3.exists(fqn("/a/b/d")));
- assertFalse(cl3.exists(fqn("/e")));
- assertFalse(cl3.exists(fqn("/e/f/g")));
- assertFalse(cl3.exists(fqn("/e/f/h")));
- assertFalse(cl3.exists(fqn("/i")));
+ assertTrue(cl3.exists(fqn("/a")));
+ assertTrue(cl3.exists(fqn("/a")));
+ assertTrue(cl3.exists(fqn("/a/b")));
+ assertTrue(cl3.exists(fqn("/a/b")));
+ assertTrue(cl3.exists(fqn("/a/b/c")));
+ assertTrue(cl3.exists(fqn("/a/b/d")));
+ assertTrue(cl3.exists(fqn("/e")));
+ assertTrue(cl3.exists(fqn("/e/f/g")));
+ assertTrue(cl3.exists(fqn("/e/f/h")));
+ assertTrue(cl3.exists(fqn("/i")));
viewChangeListener = new ViewChangeListener(cache3);
- stopCache2();
+ stopCache2(false);
viewChangeListener.waitForViewChange(60, TimeUnit.SECONDS);
SingletonStoreCacheLoader scl3 = (SingletonStoreCacheLoader)
cache3.getCacheLoaderManager().getCacheLoader();
@@ -207,7 +203,7 @@
assertTrue(cl3.get(fqn("/a")).containsKey("aaa-key"));
- stopCache3();
+ stopCache3(true);
}
public void testAvoidConcurrentStatePush() throws Exception
@@ -279,28 +275,45 @@
return new ActiveStatusModifier(mscl);
}
- protected CacheLoaderConfig getSingletonStoreCacheLoaderConfig(String
cacheloaderClass) throws Exception
+ protected CacheLoaderConfig getSingletonStoreCacheLoaderConfig(String
cacheloaderClass, boolean isPush) throws Exception
{
CacheLoaderConfig clc =
UnitTestConfigurationFactory.buildSingleCacheLoaderConfig(false, null, cacheloaderClass,
"", false, false, false, false, false);
CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig sc = new
CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig();
sc.setSingletonStoreEnabled(true);
- sc.setProperties("pushStateWhenCoordinator = true\n
pushStateWhenCoordinatorTimeout = 5000\n");
+ sc.setProperties("pushStateWhenCoordinator = " + isPush + "\n
pushStateWhenCoordinatorTimeout = 50000\n");
clc.getFirstCacheLoaderConfig().setSingletonStoreConfig(sc);
return clc;
}
+ protected CacheLoaderConfig getSingletonStoreCacheLoaderConfig(String
cacheloaderClass, String singletonStoreClass) throws Exception
+ {
+ CacheLoaderConfig clc =
UnitTestConfigurationFactory.buildSingleCacheLoaderConfig(false, null, cacheloaderClass,
"", false, false, false, false, false);
+ CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig sc = new
CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig();
+ sc.setSingletonStoreEnabled(true);
+ sc.setSingletonStoreClass(singletonStoreClass);
+ sc.setProperties("pushStateWhenCoordinator = true\n
pushStateWhenCoordinatorTimeout = 50000\n");
+ clc.getFirstCacheLoaderConfig().setSingletonStoreConfig(sc);
+ return clc;
+ }
+
private void initSingletonNonPushCache(CacheSPI cache) throws Exception
{
cache.getConfiguration().setCacheLoaderConfig(getSingletonStoreCacheLoaderConfig(
- DummyInMemoryCacheLoader.class.getName()));
+ SingletonDummyInMemoryCacheLoader.class.getName(), false));
}
private void initSingletonWithPushCache(CacheSPI cache) throws Exception
{
cache.getConfiguration().setCacheLoaderConfig(getSingletonStoreCacheLoaderConfig(
- DummyInMemoryCacheLoader.class.getName()));
+ SingletonDummyInMemoryCacheLoader.class.getName(), true));
}
+ private void initSingletonWithWaitingPushCache(CacheSPI cache) throws Exception
+ {
+ cache.getConfiguration().setCacheLoaderConfig(getSingletonStoreCacheLoaderConfig(
+ SingletonDummyInMemoryCacheLoader.class.getName(),
WaitForPushSingletonStoreCacheLoader.class.getName()));
+ }
+
private CacheLoader getDelegatingCacheLoader(CacheSPI cache)
{
AbstractDelegatingCacheLoader acl = (AbstractDelegatingCacheLoader)
cache.getCacheLoaderManager().getCacheLoader();
@@ -312,31 +325,31 @@
return Fqn.fromString(fqn);
}
- private void stopCache1()
+ private void stopCache1(boolean clearCacheLoader)
{
if (cache1 != null)
{
- TestingUtil.killCaches(cache1);
+ TestingUtil.killCaches(clearCacheLoader, cache1);
}
cache1 = null;
}
- private void stopCache2()
+ private void stopCache2(boolean clearCacheLoader)
{
if (cache2 != null)
{
- TestingUtil.killCaches(cache2);
+ TestingUtil.killCaches(clearCacheLoader, cache2);
}
cache2 = null;
}
- private void stopCache3()
+ private void stopCache3(boolean clearCacheLoader)
{
if (cache3 != null)
{
- TestingUtil.killCaches(cache3);
+ TestingUtil.killCaches(clearCacheLoader, cache3);
}
cache3 = null;
@@ -345,9 +358,9 @@
@AfterMethod(alwaysRun = true)
public void tearDown()
{
- stopCache1();
- stopCache2();
- stopCache3();
+ stopCache1(true);
+ stopCache2(true);
+ stopCache3(true);
}
class MockSingletonStoreCacheLoader extends SingletonStoreCacheLoader
@@ -425,4 +438,76 @@
return null;
}
}
+
+ static class SingletonDummyInMemoryCacheLoader extends AbstractCacheLoader
+ {
+ static final DummyInMemoryCacheLoader singleton = new DummyInMemoryCacheLoader();
+
+ public boolean exists(Fqn name) throws Exception
+ {
+ return singleton.exists(name);
+ }
+
+ public Map<Object, Object> get(Fqn name) throws Exception
+ {
+ return singleton.get(name);
+ }
+
+ public Set<?> getChildrenNames(Fqn fqn) throws Exception
+ {
+ return singleton.getChildrenNames(fqn);
+ }
+
+ public IndividualCacheLoaderConfig getConfig()
+ {
+ return singleton.getConfig();
+ }
+
+ public Object put(Fqn name, Object key, Object value) throws Exception
+ {
+ return singleton.put(name, key, value);
+ }
+
+ public void put(Fqn name, Map<Object, Object> attributes) throws Exception
+ {
+ singleton.put(name, attributes);
+ }
+
+ public Object remove(Fqn fqn, Object key) throws Exception
+ {
+ return singleton.remove(fqn, key);
+ }
+
+ public void remove(Fqn fqn) throws Exception
+ {
+ singleton.remove(fqn);
+ }
+
+ public void removeData(Fqn fqn) throws Exception
+ {
+ singleton.removeData(fqn);
+ }
+
+ public void setConfig(IndividualCacheLoaderConfig config)
+ {
+ singleton.setConfig(config);
+ }
+ }
+
+ static class WaitForPushSingletonStoreCacheLoader extends SingletonStoreCacheLoader {
+ CountDownLatch startPushLatch;
+
+ WaitForPushSingletonStoreCacheLoader() {
+ }
+
+ public void setStartPushLatch(CountDownLatch startPushLatch) {
+ this.startPushLatch = startPushLatch;
+ }
+
+ @Override
+ protected void pushState(NodeSPI node) throws Exception {
+ startPushLatch.await();
+ super.pushState(node);
+ }
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/util/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/TestingUtil.java 2010-01-19 11:41:29 UTC
(rev 8324)
+++ core/trunk/src/test/java/org/jboss/cache/util/TestingUtil.java 2010-01-20 10:11:53 UTC
(rev 8325)
@@ -527,6 +527,94 @@
}
/**
+ * Kills a cache - stops it, clears any data in any cache loaders, and rolls back any
associated txs
+ */
+ public static void killCaches(boolean clearCacheLoader, Cache... caches)
+ {
+ for (Cache c : caches)
+ {
+ try
+ {
+ if (c!= null) utf.removeCache(c);
+ if (c != null) // && ( (c.getCacheStatus() == CacheStatus.STARTED) ||
c.getCacheStatus() == CacheStatus.FAILED) )
+ {
+ CacheSPI spi = (CacheSPI) c;
+
+ Channel channel = null;
+ if (spi.getRPCManager() != null)
+ {
+ channel = spi.getRPCManager().getChannel();
+ }
+ if (spi.getTransactionManager() != null)
+ {
+ try
+ {
+ spi.getTransactionManager().rollback();
+ }
+ catch (Throwable t)
+ {
+ // don't care
+ }
+ }
+
+ if (clearCacheLoader)
+ {
+ CacheLoaderManager clm = spi.getCacheLoaderManager();
+ CacheLoader cl = clm == null ? null : clm.getCacheLoader();
+ if (cl != null)
+ {
+ try
+ {
+ cl.remove(Fqn.ROOT);
+ }
+ catch (Throwable t)
+ {
+ // don't care
+ }
+ }
+ }
+
+ try
+ {
+ spi.stop();
+ } catch (Throwable t) {
+ System.err.println(Thread.currentThread().getName() + "
!!!!!!!!!!!!!!!!!!!!! WARNING - Cache instance refused to stop.");
+ t.printStackTrace();
+ }
+ try {
+ spi.destroy();
+ } catch (Throwable t) {
+ System.err.println(Thread.currentThread().getName() + "
!!!!!!!!!!!!!!!!!!!!! WARNING - Cache instance refused to destroy.");
+ t.printStackTrace();
+ }
+ if (channel != null)
+ {
+ if (channel.isOpen()) {
+ System.err.println(Thread.currentThread().getName() +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Channel still opened.");
+ Thread.dumpStack();
+ channel.close();
+ }
+ if (channel.isOpen()) {
+ System.err.println(Thread.currentThread().getName() +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Channel close failed.");
+ System.exit(-1);
+ }
+ if (channel.isConnected()) {
+ System.err.println(Thread.currentThread().getName() +
"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Channel still connected.");
+ Thread.dumpStack();
+ System.exit(-1);
+ }
+ }
+ }
+ }
+ catch (Throwable t)
+ {
+ t.printStackTrace();
+ System.exit(-1);
+ }
+ }
+ }
+
+ /**
* Clears any associated transactions with the current thread in the caches'
transaction managers.
*/
public static void killTransactions(Cache... caches)