[infinispan-commits] Infinispan SVN: r524 - in trunk/core/src: main/java/org/infinispan/factories and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Jul 8 13:22:48 EDT 2009


Author: galder.zamarreno at jboss.com
Date: 2009-07-08 13:22:48 -0400 (Wed, 08 Jul 2009)
New Revision: 524

Added:
   trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java
   trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java
   trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java
   trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java
   trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java
   trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
   trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
   trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
   trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
Log:
[ISPN-120] (Inmortal cache entries stored as mortal entries in cache store) Added DistCacheStoreInterceptor and unit tests for shared and unshared cache store configurations.

Modified: trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheEntry.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
 package org.infinispan.container.entries;
 
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
 /**
  * A cache entry that is immortal/cannot expire
  *
@@ -7,7 +10,7 @@
  * @since 4.0
  */
 public class ImmortalCacheEntry extends AbstractInternalCacheEntry {
-
+   private static final Log log = LogFactory.getLog(ImmortalCacheEntry.class);
    private ImmortalCacheValue cacheValue;
 
    ImmortalCacheEntry(Object key, Object value) {

Modified: trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/ImmortalCacheValue.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
 package org.infinispan.container.entries;
 
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
 /**
  * An immortal cache value, to correspond with {@link org.infinispan.container.entries.ImmortalCacheEntry}
  *
@@ -7,6 +10,7 @@
  * @since 4.0
  */
 public class ImmortalCacheValue implements InternalCacheValue, Cloneable {
+   private static final Log log = LogFactory.getLog(ImmortalCacheValue.class);
    Object value;
 
    ImmortalCacheValue(Object value) {

Modified: trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/MortalCacheEntry.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
 package org.infinispan.container.entries;
 
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
 /**
  * A cache entry that is mortal.  I.e., has a lifespan.
  *
@@ -7,7 +10,7 @@
  * @since 4.0
  */
 public class MortalCacheEntry extends AbstractInternalCacheEntry {
-
+   private static final Log log = LogFactory.getLog(MortalCacheEntry.class);
    private MortalCacheValue cacheValue;
 
    public Object getValue() {
@@ -47,6 +50,7 @@
 
    public InternalCacheEntry setLifespan(long lifespan) {
       if (lifespan < 0) {
+         if (log.isTraceEnabled()) log.trace("Converting {0} into an inmortal cache entry", this); 
          return new ImmortalCacheEntry(key, cacheValue.value);
       } else {
          cacheValue.lifespan = lifespan;

Modified: trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/container/entries/TransientCacheEntry.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -1,5 +1,8 @@
 package org.infinispan.container.entries;
 
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
 /**
  * A cache entry that is transient, i.e., it can be considered expired afer a period of not being used.
  *
@@ -7,7 +10,7 @@
  * @since 4.0
  */
 public class TransientCacheEntry extends AbstractInternalCacheEntry {
-
+   private static final Log log = LogFactory.getLog(TransientCacheEntry.class);
    private TransientCacheValue cacheValue;
 
    TransientCacheEntry(Object key, Object value, long maxIdle) {
@@ -41,6 +44,7 @@
 
    public InternalCacheEntry setMaxIdle(long maxIdle) {
       if (maxIdle < 0) {
+         if (log.isTraceEnabled()) log.trace("Converting {0} into an inmortal cache entry", this);
          return new ImmortalCacheEntry(key, cacheValue.value);
       } else {
          cacheValue.maxIdle = maxIdle;

Modified: trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -113,7 +113,15 @@
             interceptorChain.appendIntereceptor(createInterceptor(PassivationInterceptor.class));
          } else {
             interceptorChain.appendIntereceptor(createInterceptor(CacheLoaderInterceptor.class));
-            interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+            switch (configuration.getCacheMode()) {
+               case DIST_SYNC:
+               case DIST_ASYNC:
+                  interceptorChain.appendIntereceptor(createInterceptor(DistCacheStoreInterceptor.class));
+                  break;
+               default:
+                  interceptorChain.appendIntereceptor(createInterceptor(CacheStoreInterceptor.class));
+                  break;
+            }
          }
       }
 

Modified: trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -71,10 +71,10 @@
  */
 @MBean(objectName = "CacheStore", description = "Component that handles storing of entries to a CacheStore from memory.")
 public class CacheStoreInterceptor extends JmxStatsCommandInterceptor {
-   private CacheLoaderManagerConfig loaderConfig = null;
+   CacheLoaderManagerConfig loaderConfig = null;
    private Map<GlobalTransaction, Integer> txStores = new ConcurrentHashMap<GlobalTransaction, Integer>();
    private Map<GlobalTransaction, Set<Object>> preparingTxs = new ConcurrentHashMap<GlobalTransaction, Set<Object>>();
-   private final AtomicLong cacheStores = new AtomicLong(0);
+   final AtomicLong cacheStores = new AtomicLong(0);
    CacheStore store;
    private CacheLoaderManager loaderManager;
 
@@ -313,7 +313,7 @@
       return cacheStores.get();
    }
 
-   private InternalCacheEntry getStoredEntry(Object key, InvocationContext ctx) {
+   InternalCacheEntry getStoredEntry(Object key, InvocationContext ctx) {
       CacheEntry entry = ctx.lookupEntry(key);
       if (entry instanceof InternalCacheEntry) {
          return (InternalCacheEntry) entry;

Added: trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,182 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.interceptors;
+
+import java.util.List;
+import java.util.Map;
+
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.PutMapCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.context.Flag;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.distribution.DistributionManager;
+import org.infinispan.factories.annotations.Inject;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.remoting.transport.Transport;
+
+/**
+ * Cache store interceptor specific for the distribution cache mode. Put operations  
+ * has been modified in such way that if they put operation is the result of an L1 put, storing
+ * in the cache store is ignore. This is done so that inmortal entries that get converted into
+ * mortal ones when putting into L1 don't get propagated to the cache store. 
+ * 
+ * Secondly, in a replicated environment where a shared cache store is used, the node in which
+ * the cache operation is executed is the one responsible for interacting with the cache. This 
+ * doesn't work with distributed mode and instead, in a shared cache store situation, the first 
+ * owner of the key is the one responsible for storing it.
+ * 
+ * In the particular case of putAll(), individual keys are checked and if a shared cache store 
+ * environment has been configured, only the first owner of that key will actually store it to 
+ * the cache store. In a unshared environment though, only those nodes that are owners of the key
+ * would store it to their local cache stores.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class DistCacheStoreInterceptor extends CacheStoreInterceptor {
+   DistributionManager dm;
+   Address address;
+
+   @Inject
+   public void inject(DistributionManager dm, Transport transport) {
+      this.dm = dm;
+      this.address = transport.getAddress();
+   }
+      
+   // ---- WRITE commands
+   
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      Object key = command.getKey();
+      if (skip(ctx, key) || ctx.isInTxScope() || !command.isSuccessful()) return returnValue;
+      InternalCacheEntry se = getStoredEntry(key, ctx);
+      store.store(se);
+      log.trace("Stored entry {0} under key {1}", se, key);
+      if (getStatisticsEnabled()) cacheStores.incrementAndGet();
+      return returnValue;
+   }
+   
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      if (skip(ctx) || ctx.isInTxScope()) return returnValue;
+
+      Map<Object, Object> map = command.getMap();
+      for (Object key : map.keySet()) {
+         if (!skip(key)) {
+            InternalCacheEntry se = getStoredEntry(key, ctx);
+            store.store(se);
+            log.trace("Stored entry {0} under key {1}", se, key);            
+         }
+      }
+      if (getStatisticsEnabled()) cacheStores.getAndAdd(map.size());
+      return returnValue;
+   }
+   
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
+      Object retval = invokeNextInterceptor(ctx, command);
+      Object key = command.getKey();
+      if (!skip(ctx, key) && !ctx.isInTxScope() && command.isSuccessful()) {
+         boolean resp = store.remove(key);
+         log.trace("Removed entry under key {0} and got response {1} from CacheStore", key, resp);
+      }
+      return retval;
+   }
+
+   @Override
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
+            throws Throwable {
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      Object key = command.getKey();
+      if (skip(ctx, key) || ctx.isInTxScope() || !command.isSuccessful()) return returnValue;
+      
+      InternalCacheEntry se = getStoredEntry(key, ctx);
+      store.store(se);
+      log.trace("Stored entry {0} under key {1}", se, key);
+      if (getStatisticsEnabled()) cacheStores.incrementAndGet();
+
+      return returnValue;
+   }   
+   
+   /**
+    * Method that skips invocation if:
+    *   - No store defined or,
+    *   - The context contains Flag.SKIP_CACHE_STORE or,
+    *   - The store is a shared one and node storing the key is not the 1st owner of the key or,
+    *   - This is an L1 put operation.
+    */
+   private boolean skip(InvocationContext ctx, Object key) {
+      if (store == null) return true;  // could be because the cache loader oes not implement cache store
+      List<Address> addresses = dm.locate(key);
+      if ((loaderConfig.isShared() && !isFirstOwner(addresses)) || ctx.hasFlag(Flag.SKIP_CACHE_STORE) || isL1Put(addresses)) {
+         if (trace)
+            log.trace("Passing up method call and bypassing this interceptor since the cache loader is either shared " +
+                        "and the caller is not the first owner of the key, or the put call is an L1 put, or the call contain a skip cache store flag");
+         return true;
+      }
+      return false;
+   }
+   
+   /**
+    * Method that skips invocation if:
+    *   - No store defined or,
+    *   - The context contains Flag.SKIP_CACHE_STORE or,
+    */
+   private final boolean skip(InvocationContext ctx) {
+      if (store == null) return true;  // could be because the cache loader oes not implement cache store
+      if (ctx.hasFlag(Flag.SKIP_CACHE_STORE)) {
+         if (trace)
+            log.trace("Passing up method call and bypassing this interceptor since the call contain a skip cache store flag");
+         return true;
+      }
+      return false;
+   }
+
+   /**
+    * Method that skips invocation if:
+    *   - The store is a shared one and node storing the key is not the 1st owner of the key or,
+    *   - This is an L1 put operation.
+    */
+   private boolean skip(Object key) {
+      List<Address> addresses = dm.locate(key);
+      if ((loaderConfig.isShared() && !isFirstOwner(addresses)) || isL1Put(addresses)) {
+         if (trace)
+            log.trace("Passing up method call and bypassing this interceptor since the cache loader is either shared " +
+                        "and the caller is not the first owner of the key, or the put call is an L1 put");
+         return true;
+      }
+      return false;
+   }
+
+   private boolean isL1Put(List<Address> addresses) {
+      return !addresses.contains(address);
+   }
+   
+   private boolean isFirstOwner(List<Address> addresses) {
+      return addresses.get(0).equals(address);
+   }
+}

Added: trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistCacheStoreTest.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+/**
+ * DistSyncCacheStoreTest.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public abstract class BaseDistCacheStoreTest extends BaseDistFunctionalTest {
+   boolean shared;
+   int id;
+
+   @Override
+   protected CacheManager addClusterEnabledCacheManager() {
+      Configuration cfg = new Configuration();
+      CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+      clmc.setShared(shared);
+      clmc.addCacheLoaderConfig(new DummyInMemoryCacheStore.Cfg(getClass().getSimpleName() + "_" + id++));
+      cfg.setCacheLoaderManagerConfig(clmc);
+      CacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(cfg);
+      cacheManagers.add(cm);
+      return cm;
+   }
+}

Added: trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreNotSharedTest.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,181 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+/**
+ * DistSyncSharedTest.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.DistSyncCacheStoreNotSharedTest", enabled = true)
+public class DistSyncCacheStoreNotSharedTest extends BaseDistCacheStoreTest {
+   private static final Log log = LogFactory.getLog(DistSyncCacheStoreNotSharedTest.class);
+
+   public DistSyncCacheStoreNotSharedTest() {
+      sync = true;
+      tx = false;
+      testRetVals = true;
+      shared = false;
+   }
+   
+   public void testPutFromNonOwner() throws Exception {
+      String key = "k2", value = "value2"; 
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      Cache<Object, String> nonOwner = getFirstNonOwner(key);
+      CacheStore nonOwnerStore = TestingUtil.extractComponent(nonOwner, CacheLoaderManager.class).getCacheStore();
+      assert !nonOwnerStore.containsKey(key);
+      Object retval = nonOwner.put(key, value);
+      asyncWait(key, PutKeyValueCommand.class, getSecondNonOwner(key));
+      assert !nonOwnerStore.containsKey(key);
+      if (testRetVals) assert retval == null;
+      assertOnAllCachesAndOwnership(key, value);
+   }
+      
+   public void testPutFromOwner() throws Exception {
+      String key = "k3", value = "value3";
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      getOwners(key)[0].put(key, value);
+      asyncWait(key, PutKeyValueCommand.class, getNonOwners(key));
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.containsKey(key);
+         } else {
+            assertIsNotInL1(c, key);
+            assert !store.containsKey(key);
+         }
+      }
+   }
+   
+   public void testPutAll() throws Exception {
+      String k1 = "1", v1 = "one", k2 = "2", v2 = "two", k3 = "3", v3 = "three", k4 = "4", v4 = "four";
+      String[] keys = new String[]{k1, k2, k3, k4};
+      Map<String, String> data = new HashMap<String, String>();
+      data.put(k1, v1);
+      data.put(k2, v2);
+      data.put(k3, v3);
+      data.put(k4, v4);
+      
+      c1.putAll(data);
+      
+      for (String key : keys) {
+         for (Cache<Object, String> c : caches) {
+            CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+            if (isOwner(c, key)) {
+               assertIsInContainerImmortal(c, key);
+               assert store.containsKey(key);
+            } else {
+               assert !store.containsKey(key);
+            }
+         }         
+      } 
+   }
+   
+   public void testRemoveFromNonOwner() throws Exception {
+      String key = "k1", value = "value";
+      initAndTest();
+      
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+      
+      Object retval = getFirstNonOwner(key).remove(key);
+      asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+      if (testRetVals) assert "value".equals(retval);
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         assert !store.containsKey(key);
+      }
+   }
+   
+   public void testReplaceFromNonOwner() throws Exception {
+      String key = "k1", value = "value", value2 = "v2";
+      initAndTest();
+      
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+      
+      Object retval = getFirstNonOwner(key).replace(key, value2);
+      asyncWait(key, ReplaceCommand.class, getSecondNonOwner(key));
+      if (testRetVals) assert value.equals(retval);
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value2);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+   }
+   
+   public void testClear() throws Exception {
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      for (int i = 0; i < 5; i++) {
+         getOwners("k" + i)[0].put("k" + i, "value" + i);
+         asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
+      }
+      // this will fill up L1 as well
+      for (int i = 0; i < 5; i++) assertOnAllCachesAndOwnership("k" + i, "value" + i);
+      for (Cache<Object, String> c : caches) assert !c.isEmpty();
+      c1.clear();
+      asyncWait(null, ClearCommand.class);
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      for (int i = 0; i < 5; i++) {
+         String key = "k" + i;
+         for (Cache<Object, String> c : caches) {
+            CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+            assert !store.containsKey(key);
+         }
+      }
+   }
+}

Added: trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/DistSyncCacheStoreSharedTest.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -0,0 +1,210 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.distribution;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.RemoveCommand;
+import org.infinispan.commands.write.ReplaceCommand;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+/**
+ * DistSyncCacheStoreSharedTest.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "distribution.DistSyncCacheStoreSharedTest", enabled = true)
+public class DistSyncCacheStoreSharedTest extends BaseDistCacheStoreTest {
+
+   public DistSyncCacheStoreSharedTest() {
+      sync = true;
+      tx = false;
+      testRetVals = true;
+      shared = true;
+   }
+   
+   public void testPutFromNonOwner() throws Exception {
+      String key = "k4", value = "value4"; 
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      Cache<Object, String> nonOwner = getFirstNonOwner(key);
+      CacheStore nonOwnerStore = TestingUtil.extractComponent(nonOwner, CacheLoaderManager.class).getCacheStore();
+      assert !nonOwnerStore.containsKey(key);
+      Object retval = nonOwner.put(key, value);
+      asyncWait(key, PutKeyValueCommand.class, getSecondNonOwner(key));
+      
+      Cache[] owners = getOwners(key);
+      CacheStore store = TestingUtil.extractComponent(owners[0], CacheLoaderManager.class).getCacheStore();
+      assertIsInContainerImmortal(owners[0], key);
+      assert store.containsKey(key);
+      
+      for (int i = 1; i < owners.length; i++) {
+         store = TestingUtil.extractComponent(owners[i], CacheLoaderManager.class).getCacheStore();
+         assertIsInContainerImmortal(owners[i], key);
+         assert !store.containsKey(key);
+      }
+      
+      for (Cache<Object, String> c : caches) {
+         store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (!isOwner(c, key)) {
+            assert !store.containsKey(key);
+         }         
+      }
+      
+      if (testRetVals) assert retval == null;
+      assertOnAllCachesAndOwnership(key, value);
+   }
+   
+   public void testPutFromOwner() throws Exception {
+      String key = "k5", value = "value5";
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      Cache[] owners = getOwners(key); 
+      Object retval = owners[0].put(key, value);
+      asyncWait(key, PutKeyValueCommand.class, getNonOwners(key));
+      CacheStore store = TestingUtil.extractComponent(owners[0], CacheLoaderManager.class).getCacheStore();
+      assertIsInContainerImmortal(owners[0], key);
+      assert store.containsKey(key);
+      
+      for (int i = 1; i < owners.length; i++) {
+         store = TestingUtil.extractComponent(owners[i], CacheLoaderManager.class).getCacheStore();
+         assertIsInContainerImmortal(owners[i], key);
+         assert !store.containsKey(key);
+      }
+      
+      for (Cache<Object, String> c : caches) {
+         store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (!isOwner(c, key)) {
+            assert !store.containsKey(key);
+         }         
+      }
+      
+      if (testRetVals) assert retval == null;
+      assertOnAllCachesAndOwnership(key, value);
+   }
+
+
+   public void testPutAll() throws Exception {
+      String k1 = "1", v1 = "one", k2 = "2", v2 = "two", k3 = "3", v3 = "three", k4 = "4", v4 = "four";
+      String[] keys = new String[]{k1, k2, k3, k4};
+      Map<String, String> data = new HashMap<String, String>();
+      data.put(k1, v1);
+      data.put(k2, v2);
+      data.put(k3, v3);
+      data.put(k4, v4);
+      
+      c1.putAll(data);
+      
+      for (String key : keys) {
+         for (Cache<Object, String> c : caches) {
+            CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+            if (isFirstOwner(c, key)) {
+               assertIsInContainerImmortal(c, key);
+               assert store.containsKey(key);
+            } else {
+               assert !store.containsKey(key);
+            }
+         }         
+      }      
+   }
+   
+   public void testRemoveFromNonOwner() throws Exception {
+      String key = "k1", value = "value";
+      initAndTest();
+      
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isFirstOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+      
+      Object retval = getFirstNonOwner(key).remove(key);
+      asyncWait("k1", RemoveCommand.class, getSecondNonOwner("k1"));
+      if (testRetVals) assert value.equals(retval);
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         assert !store.containsKey(key);
+      }
+   }
+   
+   public void testReplaceFromNonOwner() throws Exception {
+      String key = "k1", value = "value", value2 = "v2";
+      initAndTest();
+      
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isFirstOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+      
+      Object retval = getFirstNonOwner(key).replace(key, value2);
+      asyncWait(key, ReplaceCommand.class, getSecondNonOwner(key));
+      if (testRetVals) assert value.equals(retval);
+      for (Cache<Object, String> c : caches) {
+         CacheStore store = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheStore();
+         if (isFirstOwner(c, key)) {
+            assertIsInContainerImmortal(c, key);
+            assert store.load(key).getValue().equals(value2);
+         } else {
+            assert !store.containsKey(key);
+         }
+      }
+   }
+   
+   public void testClear() throws Exception {
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      for (int i = 0; i < 5; i++) {
+         getOwners("k" + i)[0].put("k" + i, "value" + i);
+         asyncWait("k" + i, PutKeyValueCommand.class, getNonOwners("k" + i));
+      }
+      // this will fill up L1 as well
+      for (int i = 0; i < 5; i++) assertOnAllCachesAndOwnership("k" + i, "value" + i);
+      for (Cache<Object, String> c : caches) assert !c.isEmpty();
+      c1.clear();
+      asyncWait(null, ClearCommand.class);
+      for (Cache<Object, String> c : caches) assert c.isEmpty();
+      
+      /* We only check c1 because on a shared situation, no matter where the clear is called,
+       * it should clear the whole store regardless. Bear in mind that in the test, even though
+       * the cache store is shared, each cache has each own cache store, that allows for checking 
+       * who execute puts, removes...etc. */
+      CacheStore store = TestingUtil.extractComponent(c1, CacheLoaderManager.class).getCacheStore();
+      for (int i = 0; i < 5; i++) {
+         String key = "k" + i;
+         assert !store.containsKey(key);
+      }
+   }
+}

Modified: trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -44,7 +44,7 @@
 @Test(groups = {"functional", "unit"})
 public abstract class MultipleCacheManagersTest extends AbstractCacheTest {
 
-   private List<CacheManager> cacheManagers = new ArrayList<CacheManager>();
+   protected List<CacheManager> cacheManagers = new ArrayList<CacheManager>();
    private IdentityHashMap<Cache, ReplListener> listeners = new IdentityHashMap<Cache, ReplListener>();
 
    @BeforeClass

Modified: trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java	2009-07-08 13:14:45 UTC (rev 523)
+++ trunk/core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java	2009-07-08 17:22:48 UTC (rev 524)
@@ -41,6 +41,19 @@
       globalConfiguration.setTransportProperties(newTransportProps);
       return new DefaultCacheManager(globalConfiguration);
    }
+   
+   /**
+    * Creates an cache manager that does support clustering with a given default cache configuration.
+    */
+   public static CacheManager createClusteredCacheManager(Configuration defaultCacheConfig) {
+      GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
+      amendMarshaller(globalConfiguration);
+//      amendJmx(globalConfiguration);
+      Properties newTransportProps = new Properties();
+      newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING, JGroupsConfigBuilder.getJGroupsConfig());
+      globalConfiguration.setTransportProperties(newTransportProps);
+      return new DefaultCacheManager(globalConfiguration, defaultCacheConfig);
+   }
 
    /**
     * Creates a cache manager and ammends the supplied configuration in order to avoid conflicts (e.g. jmx, jgroups)




More information about the infinispan-commits mailing list