[infinispan-commits] Infinispan SVN: r524 - in trunk/core/src: main/java/org/infinispan/factories and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Jul 8 13:22:48 EDT 2009
Author: galder.zamarreno at jboss.com
Date: 2009-07-08 13:22:48 -0400 (Wed, 08 Jul 2009)
New Revision: 524
Added:
trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java
trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java
trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java
Modified:
trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java
trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java
trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java
trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java
trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
Log:
[ISPN-120] (Inmortal cache entries stored as mortal entries in cache store) Added DistCacheStoreInterceptor and unit tests for shared and unshared cache store configurations.
Modified: trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
package org.infinispan.container.entries;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
/**
* A cache entry that is immortal/cannot expire
*
@@ -7,7 +10,7 @@
* @since 4.0
*/
public class ImmortalCacheEntry extends AbstractInternalCacheEntry {
-
+ private static final Log log = LogFactory.getLog(ImmortalCacheEntry.class);
private ImmortalCacheValue cacheValue;
ImmortalCacheEntry(Object key, Object value) {
Modified: trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
package org.infinispan.container.entries;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
/**
* An immortal cache value, to correspond with {@link org.infinispan.container.entries.ImmortalCacheEntry}
*
@@ -7,6 +10,7 @@
* @since 4.0
*/
public class ImmortalCacheValue implements InternalCacheValue, Cloneable {
+ private static final Log log = LogFactory.getLog(ImmortalCacheValue.class);
Object value;
ImmortalCacheValue(Object value) {
Modified: trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
package org.infinispan.container.entries;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
/**
* A cache entry that is mortal. I.e., has a lifespan.
*
@@ -7,7 +10,7 @@
* @since 4.0
*/
public class MortalCacheEntry extends AbstractInternalCacheEntry {
-
+ private static final Log log = LogFactory.getLog(MortalCacheEntry.class);
private MortalCacheValue cacheValue;
public Object getValue() {
@@ -47,6 +50,7 @@
public InternalCacheEntry setLifespan(long lifespan) {
if (lifespan < 0) {
+ if (log.isTraceEnabled()) log.trace("Converting {0} into an inmortal cache entry", this);
return new ImmortalCacheEntry(key, cacheValue.value);
} else {
cacheValue.lifespan = lifespan;
Modified: trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
package org.infinispan.container.entries;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
/**
* A cache entry that is transient, i.e., it can be considered expired afer a period of not being used.
*
@@ -7,7 +10,7 @@
* @since 4.0
*/
public class TransientCacheEntry extends AbstractInternalCacheEntry {
-
+ private static final Log log = LogFactory.getLog(TransientCacheEntry.class);
private TransientCacheValue cacheValue;
TransientCacheEntry(Object key, Object value, long maxIdle) {
@@ -41,6 +44,7 @@
public InternalCacheEntry setMaxIdle(long maxIdle) {
if (maxIdle < 0) {
+ if (log.isTraceEnabled()) log.trace("Converting {0} into an inmortal cache entry", this);
return new ImmortalCacheEntry(key, cacheValue.value);
} else {
cacheValue.maxIdle = maxIdle;
Modified: trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -113,7 +113,15 @@
interceptorChain.appendIntereceptor(createInterceptor(PassivationInterceptor.class));
} else {
interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
- interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+ switch (configuration.getCacheMode()) {
+ case DIST_SYNC:
+ case DIST_ASYNC:
+ interceptorChain.appendIntereceptor(createInterceptor(DistCacheStoreInterceptor.class));
+ break;
+ default:
+ interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+ break;
+ }
}
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -71,10 +71,10 @@
*/
@MBean(objectName = "CacheStore", description = "Component that handles storing of entries to a CacheStore from memory.")
public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
- private CacheLoaderManagerConfig loaderConfig = null;
+ CacheLoaderManagerConfig loaderConfig = null;
private Map<GlobalTransaction, Integer> txStores = new ConcurrentHashMap<GlobalTransaction, Integer>();
private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
- private final AtomicLong cacheStores = new AtomicLong(0);
+ final AtomicLong cacheStores = new AtomicLong(0);
CacheStore store;
private CacheLoaderManager loaderManager;
@@ -313,7 +313,7 @@
return cacheStores.get();
}
- private InternalCacheEntry getStoredEntry(Object key, InvocationContext ctx) {
+ InternalCacheEntry getStoredEntry(Object key, InvocationContext ctx) {
CacheEntry entry = ctx.lookupEntry(key);
if (entry instanceof InternalCacheEntry) {
return (InternalCacheEntry) entry;
Added: trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.interceptors;
+
+import java.util.List;
+import java.util.Map;
+
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.PutMapCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+
+/**
+ * Cache store interceptor specific for the distribution cache mode. Put operations
+ * has been modified in such way that if they put operation is the result of an L1 put, storing
+ * in the cache store is ignore. This is done so that inmortal entries that get converted into
+ * mortal ones when putting into L1 don't get propagated to the cache store.
+ *
+ * Secondly, in a replicated environment where a shared cache store is used, the node in which
+ * the cache operation is executed is the one responsible for interacting with the cache. This
+ * doesn't work with distributed mode and instead, in a shared cache store situation, the first
+ * owner of the key is the one responsible for storing it.
+ *
+ * In the particular case of putAll(), individual keys are checked and if a shared cache store
+ * environment has been configured, only the first owner of that key will actually store it to
+ * the cache store. In a unshared environment though, only those nodes that are owners of the key
+ * would store it to their local cache stores.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DistCacheStoreInterceptor extends CacheStoreInterceptor {
+ DistributionManager dm;
+ Address address;
+
+ @Inject
+ public void inject(DistributionManager dm, Transport transport) {
+ this.dm = dm;
+ this.address = transport.getAddress();
+ }
+
+ // ---- WRITE commands
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ Object key = command.getKey();
+ if (skip(ctx, key) || ctx.isInTxScope() || !command.isSuccessful()) return returnValue;
+ InternalCacheEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
+ if (getStatisticsEnabled()) cacheStores.incrementAndGet();
+ return returnValue;
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (skip(ctx) || ctx.isInTxScope()) return returnValue;
+
+ Map<Object, Object> map = command.getMap();
+ for (Object key : map.keySet()) {
+ if (!skip(key)) {
+ InternalCacheEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
+ }
+ }
+ if (getStatisticsEnabled()) cacheStores.getAndAdd(map.size());
+ return returnValue;
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+ Object retval = invokeNextInterceptor(ctx, command);
+ Object key = command.getKey();
+ if (!skip(ctx, key) && !ctx.isInTxScope() && command.isSuccessful()) {
+ boolean resp = store.remove(key);
+ log.trace("Removed entry under key {0} and got response {1} from CacheStore", key, resp);
+ }
+ return retval;
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
+ throws Throwable {
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ Object key = command.getKey();
+ if (skip(ctx, key) || ctx.isInTxScope() || !command.isSuccessful()) return returnValue;
+
+ InternalCacheEntry se = getStoredEntry(key, ctx);
+ store.store(se);
+ log.trace("Stored entry {0} under key {1}", se, key);
+ if (getStatisticsEnabled()) cacheStores.incrementAndGet();
+
+ return returnValue;
+ }
+
+ /**
+ * Method that skips invocation if:
+ * - No store defined or,
+ * - The context contains Flag.SKIP_CACHE_STORE or,
+ * - The store is a shared one and node storing the key is not the 1st owner of the key or,
+ * - This is an L1 put operation.
+ */
+ private boolean skip(InvocationContext ctx, Object key) {
+ if (store == null) return true; // could be because the cache loader oes not implement cache store
+ List<Address> addresses = dm.locate(key);
+ if ((loaderConfig.isShared() && !isFirstOwner(addresses)) || ctx.hasFlag(Flag.SKIP_CACHE_STORE) || isL1Put(addresses)) {
+ if (trace)
+ log.trace("Passing up method call and bypassing this interceptor since the cache loader is either shared " +
+ "and the caller is not the first owner of the key, or the put call is an L1 put, or the call contain a skip cache store flag");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Method that skips invocation if:
+ * - No store defined or,
+ * - The context contains Flag.SKIP_CACHE_STORE or,
+ */
+ private final boolean skip(InvocationContext ctx) {
+ if (store == null) return true; // could be because the cache loader oes not implement cache store
+ if (ctx.hasFlag(Flag.SKIP_CACHE_STORE)) {
+ if (trace)
+ log.trace("Passing up method call and bypassing this interceptor since the call contain a skip cache store flag");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Method that skips invocation if:
+ * - The store is a shared one and node storing the key is not the 1st owner of the key or,
+ * - This is an L1 put operation.
+ */
+ private boolean skip(Object key) {
+ List<Address> addresses = dm.locate(key);
+ if ((loaderConfig.isShared() && !isFirstOwner(addresses)) || isL1Put(addresses)) {
+ if (trace)
+ log.trace("Passing up method call and bypassing this interceptor since the cache loader is either shared " +
+ "and the caller is not the first owner of the key, or the put call is an L1 put");
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isL1Put(List<Address> addresses) {
+ return !addresses.contains(address);
+ }
+
+ private boolean isFirstOwner(List<Address> addresses) {
+ return addresses.get(0).equals(address);
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+/**
+ * DistSyncCacheStoreTest.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public abstract class BaseDistCacheStoreTest extends BaseDistFunctionalTest {
+ boolean shared;
+ int id;
+
+ @Override
+ protected CacheManager addClusterEnabledCacheManager() {
+ Configuration cfg = new Configuration();
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ clmc.setShared(shared);
+ clmc.addCacheLoaderConfig(new DummyInMemoryCacheStore.Cfg(getClass().getSimpleName() + "_" + id++));
+ cfg.setCacheLoaderManagerConfig(clmc);
+ CacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+ cacheManagers.add(cm);
+ return cm;
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+/**
+ * DistSyncSharedTest.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.DistSyncCacheStoreNotSharedTest", enabled = true)
+public class DistSyncCacheStoreNotSharedTest extends BaseDistCacheStoreTest {
+ private static final Log log = LogFactory.getLog(DistSyncCacheStoreNotSharedTest.class);
+
+ public DistSyncCacheStoreNotSharedTest() {
+ sync = true;
+ tx = false;
+ testRetVals = true;
+ shared = false;
+ }
+
+ public void testPutFromNonOwner() throws Exception {
+ String key = "k2", value = "value2";
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ Cache<Object, String> nonOwner = getFirstNonOwner(key);
+ CacheStore nonOwnerStore = TestingUtil.extractComponent(nonOwner, CacheLoaderManager.class).getCacheStore();
+ assert !nonOwnerStore.containsKey(key);
+ Object retval = nonOwner.put(key, value);
+ asyncWait(key, PutKeyValueCommand.class, getSecondNonOwner(key));
+ assert !nonOwnerStore.containsKey(key);
+ if (testRetVals) assert retval == null;
+ assertOnAllCachesAndOwnership(key, value);
+ }
+
+ public void testPutFromOwner() throws Exception {
+ String key = "k3", value = "value3";
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ getOwners(key)[0].put(key, value);
+ asyncWait(key, PutKeyValueCommand.class, getNonOwners(key));
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.containsKey(key);
+ } else {
+ assertIsNotInL1(c, key);
+ assert !store.containsKey(key);
+ }
+ }
+ }
+
+ public void testPutAll() throws Exception {
+ String k1 = "1", v1 = "one", k2 = "2", v2 = "two", k3 = "3", v3 = "three", k4 = "4", v4 = "four";
+ String[] keys = new String[]{k1, k2, k3, k4};
+ Map<String, String> data = new HashMap<String, String>();
+ data.put(k1, v1);
+ data.put(k2, v2);
+ data.put(k3, v3);
+ data.put(k4, v4);
+
+ c1.putAll(data);
+
+ for (String key : keys) {
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.containsKey(key);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+ }
+ }
+
+ public void testRemoveFromNonOwner() throws Exception {
+ String key = "k1", value = "value";
+ initAndTest();
+
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+
+ Object retval = getFirstNonOwner(key).remove(key);
+ asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+ if (testRetVals) assert "value".equals(retval);
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ assert !store.containsKey(key);
+ }
+ }
+
+ public void testReplaceFromNonOwner() throws Exception {
+ String key = "k1", value = "value", value2 = "v2";
+ initAndTest();
+
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+
+ Object retval = getFirstNonOwner(key).replace(key, value2);
+ asyncWait(key, ReplaceCommand.class, getSecondNonOwner(key));
+ if (testRetVals) assert value.equals(retval);
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value2);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+ }
+
+ public void testClear() throws Exception {
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ for (int i = 0; i < 5; i++) {
+ getOwners("k" + i)[0].put("k" + i, "value" + i);
+ asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
+ }
+ // this will fill up L1 as well
+ for (int i = 0; i < 5; i++) assertOnAllCachesAndOwnership("k" + i, "value" + i);
+ for (Cache<Object, String> c : caches) assert !c.isEmpty();
+ c1.clear();
+ asyncWait(null, ClearCommand.class);
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ for (int i = 0; i < 5; i++) {
+ String key = "k" + i;
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ assert !store.containsKey(key);
+ }
+ }
+ }
+}
Added: trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,210 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * DistSyncCacheStoreSharedTest.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.DistSyncCacheStoreSharedTest", enabled = true)
+public class DistSyncCacheStoreSharedTest extends BaseDistCacheStoreTest {
+
+ public DistSyncCacheStoreSharedTest() {
+ sync = true;
+ tx = false;
+ testRetVals = true;
+ shared = true;
+ }
+
+ public void testPutFromNonOwner() throws Exception {
+ String key = "k4", value = "value4";
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ Cache<Object, String> nonOwner = getFirstNonOwner(key);
+ CacheStore nonOwnerStore = TestingUtil.extractComponent(nonOwner, CacheLoaderManager.class).getCacheStore();
+ assert !nonOwnerStore.containsKey(key);
+ Object retval = nonOwner.put(key, value);
+ asyncWait(key, PutKeyValueCommand.class, getSecondNonOwner(key));
+
+ Cache[] owners = getOwners(key);
+ CacheStore store = TestingUtil.extractComponent(owners[0], CacheLoaderManager.class).getCacheStore();
+ assertIsInContainerImmortal(owners[0], key);
+ assert store.containsKey(key);
+
+ for (int i = 1; i < owners.length; i++) {
+ store = TestingUtil.extractComponent(owners[i], CacheLoaderManager.class).getCacheStore();
+ assertIsInContainerImmortal(owners[i], key);
+ assert !store.containsKey(key);
+ }
+
+ for (Cache<Object, String> c : caches) {
+ store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (!isOwner(c, key)) {
+ assert !store.containsKey(key);
+ }
+ }
+
+ if (testRetVals) assert retval == null;
+ assertOnAllCachesAndOwnership(key, value);
+ }
+
+ public void testPutFromOwner() throws Exception {
+ String key = "k5", value = "value5";
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ Cache[] owners = getOwners(key);
+ Object retval = owners[0].put(key, value);
+ asyncWait(key, PutKeyValueCommand.class, getNonOwners(key));
+ CacheStore store = TestingUtil.extractComponent(owners[0], CacheLoaderManager.class).getCacheStore();
+ assertIsInContainerImmortal(owners[0], key);
+ assert store.containsKey(key);
+
+ for (int i = 1; i < owners.length; i++) {
+ store = TestingUtil.extractComponent(owners[i], CacheLoaderManager.class).getCacheStore();
+ assertIsInContainerImmortal(owners[i], key);
+ assert !store.containsKey(key);
+ }
+
+ for (Cache<Object, String> c : caches) {
+ store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (!isOwner(c, key)) {
+ assert !store.containsKey(key);
+ }
+ }
+
+ if (testRetVals) assert retval == null;
+ assertOnAllCachesAndOwnership(key, value);
+ }
+
+
+ public void testPutAll() throws Exception {
+ String k1 = "1", v1 = "one", k2 = "2", v2 = "two", k3 = "3", v3 = "three", k4 = "4", v4 = "four";
+ String[] keys = new String[]{k1, k2, k3, k4};
+ Map<String, String> data = new HashMap<String, String>();
+ data.put(k1, v1);
+ data.put(k2, v2);
+ data.put(k3, v3);
+ data.put(k4, v4);
+
+ c1.putAll(data);
+
+ for (String key : keys) {
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isFirstOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.containsKey(key);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+ }
+ }
+
+ public void testRemoveFromNonOwner() throws Exception {
+ String key = "k1", value = "value";
+ initAndTest();
+
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isFirstOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+
+ Object retval = getFirstNonOwner(key).remove(key);
+ asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+ if (testRetVals) assert value.equals(retval);
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ assert !store.containsKey(key);
+ }
+ }
+
+ public void testReplaceFromNonOwner() throws Exception {
+ String key = "k1", value = "value", value2 = "v2";
+ initAndTest();
+
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isFirstOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+
+ Object retval = getFirstNonOwner(key).replace(key, value2);
+ asyncWait(key, ReplaceCommand.class, getSecondNonOwner(key));
+ if (testRetVals) assert value.equals(retval);
+ for (Cache<Object, String> c : caches) {
+ CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+ if (isFirstOwner(c, key)) {
+ assertIsInContainerImmortal(c, key);
+ assert store.load(key).getValue().equals(value2);
+ } else {
+ assert !store.containsKey(key);
+ }
+ }
+ }
+
+ public void testClear() throws Exception {
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+ for (int i = 0; i < 5; i++) {
+ getOwners("k" + i)[0].put("k" + i, "value" + i);
+ asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
+ }
+ // this will fill up L1 as well
+ for (int i = 0; i < 5; i++) assertOnAllCachesAndOwnership("k" + i, "value" + i);
+ for (Cache<Object, String> c : caches) assert !c.isEmpty();
+ c1.clear();
+ asyncWait(null, ClearCommand.class);
+ for (Cache<Object, String> c : caches) assert c.isEmpty();
+
+ /* We only check c1 because on a shared situation, no matter where the clear is called,
+ * it should clear the whole store regardless. Bear in mind that in the test, even though
+ * the cache store is shared, each cache has each own cache store, that allows for checking
+ * who execute puts, removes...etc. */
+ CacheStore store = TestingUtil.extractComponent(c1, CacheLoaderManager.class).getCacheStore();
+ for (int i = 0; i < 5; i++) {
+ String key = "k" + i;
+ assert !store.containsKey(key);
+ }
+ }
+}
Modified: trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -44,7 +44,7 @@
@Test(groups = {"functional", "unit"})
public abstract class MultipleCacheManagersTest extends AbstractCacheTest {
- private List<CacheManager> cacheManagers = new ArrayList<CacheManager>();
+ protected List<CacheManager> cacheManagers = new ArrayList<CacheManager>();
private IdentityHashMap<Cache, ReplListener> listeners = new IdentityHashMap<Cache, ReplListener>();
@BeforeClass
Modified: trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java 2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java 2009-07-08 17:22:48 UTC (rev 524)
@@ -41,6 +41,19 @@
globalConfiguration.setTransportProperties(newTransportProps);
return new DefaultCacheManager(globalConfiguration);
}
+
+ /**
+ * Creates an cache manager that does support clustering with a given default cache configuration.
+ */
+ public static CacheManager createClusteredCacheManager(Configuration defaultCacheConfig) {
+ GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
+ amendMarshaller(globalConfiguration);
+// amendJmx(globalConfiguration);
+ Properties newTransportProps = new Properties();
+ newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING, JGroupsConfigBuilder.getJGroupsConfig());
+ globalConfiguration.setTransportProperties(newTransportProps);
+ return new DefaultCacheManager(globalConfiguration, defaultCacheConfig);
+ }
/**
* Creates a cache manager and ammends the supplied configuration in order to avoid conflicts (e.g. jmx, jgroups)
More information about the infinispan-commits
mailing list