[exo-jcr-commits] exo-jcr SVN: r5420 - in kernel/trunk: exo.kernel.component.ext.cache.impl.infinispan.v5 and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 4 07:31:09 EST 2012


Author: nfilotto
Date: 2012-01-04 07:31:08 -0500 (Wed, 04 Jan 2012)
New Revision: 5420

Modified:
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
   kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
   kernel/trunk/pom.xml
Log:
EXOJCR-1682: Improve ISPN integration to support properly the distribution mode
* Added some unit tests
* Upgraded to ISPN 5.1.0.CR2

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -24,6 +24,10 @@
 			<artifactId>exo.kernel.component.cache</artifactId>
 		</dependency>
 		<dependency>
+			<groupId>org.exoplatform.kernel</groupId>
+			<artifactId>exo.kernel.component.common</artifactId>
+		</dependency>
+		<dependency>
 			<groupId>org.infinispan</groupId>
 			<artifactId>infinispan-core</artifactId>
 		</dependency>

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java	2012-01-04 12:31:08 UTC (rev 5420)
@@ -48,7 +48,6 @@
 import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.util.concurrent.locks.LockManager;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -58,12 +57,9 @@
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -123,6 +119,14 @@
    {
       return cache;
    }
+   
+   /**
+    * @return the fullName
+    */
+   String getFullName()
+   {
+      return fullName;
+   }
 
    /**
     * {@inheritDoc}
@@ -199,9 +203,9 @@
          @Override
          public Void run()
          {
-            MapReduceTask<CacheKey<K>, V, String, CacheKey<K>> task =
-               new MapReduceTask<CacheKey<K>, V, String, CacheKey<K>>(cache);
-            task.mappedWith(new ClearCacheMapper<K, V>(fullName)).reducedWith(new ClearCacheReducer<String, V, K>());
+            MapReduceTask<CacheKey<K>, V, Void, Void> task =
+               new MapReduceTask<CacheKey<K>, V, Void, Void>(cache);
+            task.mappedWith(new ClearCacheMapper<K, V>(fullName)).reducedWith(new ClearCacheReducer());
             task.execute();
             return null;
          }
@@ -1074,7 +1078,7 @@
       }
    }
 
-   public static class ClearCacheMapper<K, V> extends AbstractExoCacheMapper<K, V, String, CacheKey<K>>
+   public static class ClearCacheMapper<K, V> extends AbstractExoCacheMapper<K, V, Void, Void>
    {
 
       public ClearCacheMapper()
@@ -1090,14 +1094,27 @@
        * {@inheritDoc}
        */
       @Override
-      protected void _map(CacheKey<K> key, V value, Collector<String, CacheKey<K>> collector)
+      protected void _map(CacheKey<K> key, V value, Collector<Void, Void> collector)
       {
-         collector.emit("keys", key);
+         ExoContainer container = ExoContainerContext.getTopContainer();
+         if (container == null)
+         {
+            LOG.error("The top container could not be found");
+            return;
+         }
+         DistributedCacheManager dcm =
+            (DistributedCacheManager)container.getComponentInstanceOfType(DistributedCacheManager.class);
+         if (dcm == null)
+         {
+            LOG.error("The DistributedCacheManager could not be found at top container level, please configure it.");
+            return;
+         }
+         Cache<CacheKey<K>, V> cache = dcm.getCache(CACHE_NAME);
+         cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.FAIL_SILENTLY).remove(key);
       }
-
    }
 
-   public static class ClearCacheReducer<K, V, KIn> implements Reducer<K, CacheKey<KIn>>
+   public static class ClearCacheReducer implements Reducer<Void, Void>
    {
 
       /**
@@ -1109,46 +1126,8 @@
        * @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object, java.util.Iterator)
        */
       @Override
-      public CacheKey<KIn> reduce(K reducedKey, Iterator<CacheKey<KIn>> iter)
+      public Void reduce(Void reducedKey, Iterator<Void> iter)
       {
-         CacheKey<KIn> firstKey;
-         if (iter == null || !iter.hasNext() || (firstKey = iter.next()) == null)
-         {
-            return null;
-         }
-         ExoContainer container = ExoContainerContext.getTopContainer();
-         if (container == null)
-         {
-            LOG.error("The top container could not be found");
-            return null;
-         }
-         DistributedCacheManager dcm =
-            (DistributedCacheManager)container.getComponentInstanceOfType(DistributedCacheManager.class);
-         if (dcm == null)
-         {
-            LOG.error("The DistributedCacheManager could not be found at top container level, please configure it.");
-            return null;
-         }
-         Cache<CacheKey<K>, V> cache = dcm.getCache(CACHE_NAME);
-         final LockManager lm = cache.getAdvancedCache().getLockManager();
-         // Sort the keys to prevent deadlocks
-         Set<CacheKey<KIn>> keys = new TreeSet<CacheKey<KIn>>(new Comparator<CacheKey<KIn>>()
-         {
-            public int compare(CacheKey<KIn> o1, CacheKey<KIn> o2)
-            {
-               int result = lm.getLockId(o1) - lm.getLockId(o2);
-               return result == 0 ? System.identityHashCode(o1) - System.identityHashCode(o2) : result;
-            }
-         });
-         keys.add(firstKey);
-         while (iter.hasNext())
-         {
-            keys.add(iter.next());
-         }
-         for (CacheKey<KIn> key : keys)
-         {
-            cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.FAIL_SILENTLY).remove(key);
-         }
          return null;
       }
    }

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java	2012-01-04 12:31:08 UTC (rev 5420)
@@ -27,7 +27,9 @@
 import org.exoplatform.container.xml.ValueParam;
 import org.exoplatform.services.log.ExoLogger;
 import org.exoplatform.services.log.Log;
+import org.exoplatform.services.transaction.TransactionService;
 import org.infinispan.Cache;
+import org.infinispan.factories.ComponentRegistry;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.picocontainer.Startable;
@@ -35,6 +37,8 @@
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 
+import javax.transaction.TransactionManager;
+
 /**
  * This class is used to allow to use infinispan in distribution mode with
  * the ability to launch infinispan instances in standalone mode, in other
@@ -77,7 +81,7 @@
    public DistributedCacheManager(String configurationFile, Map<String, String> parameters,
       ConfigurationManager configManager)
    {
-      this.manager = init(configurationFile, parameters, configManager);
+      this.manager = init(configurationFile, parameters, configManager, null);
    }
 
    /**
@@ -85,13 +89,23 @@
     */
    public DistributedCacheManager(InitParams params, ConfigurationManager configManager)
    {
+      this(params, configManager, null);
+   }
+
+   /**
+    * Default constructor
+    */
+   public DistributedCacheManager(InitParams params, ConfigurationManager configManager, TransactionService ts)
+   {
       ValueParam vp;
       final String result;
       if (params != null && (vp = params.getValueParam(CONFIG_FILE_PARAMETER_NAME)) != null
          && (result = vp.getValue()) != null && !result.isEmpty())
       {
          PropertiesParam pp = params.getPropertiesParam(PARAMS_PARAMETER_NAME);
-         this.manager = init(result, pp == null ? null : pp.getProperties(), configManager);
+         this.manager =
+            init(result, pp == null ? null : pp.getProperties(), configManager,
+               ts == null ? null : ts.getTransactionManager());
       }
       else
       {
@@ -104,10 +118,11 @@
     * @param configurationFile the path of the configuration file
     * @param parameters the parameters to inject into the configuration file
     * @param configManager the configuration manager used to get the configuration file
+    * @param tm the transaction manager
     * @return the CacheManager initialized
     */
    private EmbeddedCacheManager init(final String configurationFile, final Map<String, String> parameters,
-      final ConfigurationManager configManager)
+      final ConfigurationManager configManager, final TransactionManager tm)
    {
       try
       {
@@ -138,7 +153,14 @@
                manager.start();
                for (String cacheName : manager.getCacheNames())
                {
-                  manager.getCache(cacheName);
+                  Cache cache = manager.getCache(cacheName);
+                  if (tm != null)
+                  {
+                     // We inject the transaction manager
+                     ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
+                     cr.registerComponent(tm, TransactionManager.class);
+                     cr.rewire();
+                  }                  
                }
                return manager;
             }

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,7 +54,7 @@
    </global>
    <default>
       <locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="10000" writeSkewCheck="false" concurrencyLevel="500"/>
-      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
       <jmxStatistics enabled="true"/>
       <invocationBatching enabled="true"/>
       <clustering mode="replication">

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java	2012-01-04 12:31:08 UTC (rev 5420)
@@ -32,6 +32,9 @@
 import org.exoplatform.services.cache.impl.infinispan.ExoCacheFactoryImpl;
 import org.exoplatform.services.ispn.DistributedCacheManager;
 import org.exoplatform.test.BasicTestCase;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyGenerator;
 import org.infinispan.distribution.DistributionManager;
 
 import java.io.Serializable;
@@ -42,8 +45,10 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -467,17 +472,29 @@
    {
       private static final long serialVersionUID = 1L;
 
-      public String value;
-      public MyKey(){}
-      public MyKey(String value)
+      public Object value;
+
+      public String displayValue;
+
+      public MyKey()
       {
+      }
+
+      public MyKey(Object value)
+      {
          this.value = value;
       }
 
+      public MyKey(String displayValue, Object value)
+      {
+         this.displayValue = displayValue;
+         this.value = value;
+      }
+
       @Override
       public boolean equals(Object paramObject)
       {
-         return paramObject instanceof MyKey && ((MyKey)paramObject).value.endsWith(value);
+         return paramObject instanceof MyKey && ((MyKey)paramObject).value.equals(value);
       }
 
       @Override
@@ -489,10 +506,30 @@
       @Override
       public String toString()
       {
-         return value;
+         return displayValue == null ? value.toString() : displayValue;
       }
    }
-   
+
+   public static class MyKeyGenerator implements KeyGenerator<DistributedExoCache.CacheKey<MyKey>>
+   {
+
+      public static final Random rnd = new Random();
+
+      private String fullName;
+
+      public MyKeyGenerator(String fullName)
+      {
+         this.fullName = fullName;
+      }
+
+      @Override
+      public DistributedExoCache.CacheKey<MyKey> getKey()
+      {
+         return new DistributedExoCache.CacheKey<MyKey>(fullName, new MyKey(rnd.nextLong()));
+      }
+   }
+
+   @SuppressWarnings({"rawtypes", "unchecked"})
    public void testDistributedCache() throws Exception
    {
       PortalContainer pc = PortalContainer.getInstance();
@@ -511,191 +548,312 @@
       DistributedCacheManager dcm2 =
          new DistributedCacheManager("jar:/conf/portal/distributed-cache-configuration.xml", params, cm);
 
-      @SuppressWarnings("unchecked")
       DistributedExoCache<Serializable, Object> cache1 =
          (DistributedExoCache<Serializable, Object>)((ExoCacheFactory)pc
             .getComponentInstanceOfType(ExoCacheFactory.class)).createCache(config);
       DistributionManager dm = cache1.getCache().getDistributionManager();
-      MyCacheListener listener1 = new MyCacheListener();
-      cache1.addCacheListener(listener1);
       DistributedExoCache<Serializable, Object> cache2 =
          (DistributedExoCache<Serializable, Object>)new ExoCacheFactoryImpl(
             (ExoContainerContext)pc.getComponentInstanceOfType(ExoContainerContext.class),
             "jar:/conf/portal/cache-configuration-template.xml", cm, dcm2).createCache(config);
-      MyCacheListener listener2 = new MyCacheListener();
-      cache2.addCacheListener(listener2);
+      KeyAffinityService kas1 =
+         KeyAffinityServiceFactory.newLocalKeyAffinityService(cache1.getCache(),
+            new MyKeyGenerator(cache1.getFullName()), Executors.newSingleThreadExecutor(), 100);
+      KeyAffinityService kas2 =
+         KeyAffinityServiceFactory.newLocalKeyAffinityService(cache2.getCache(),
+            new MyKeyGenerator(cache1.getFullName()), Executors.newSingleThreadExecutor(), 100);
+
       try
       {
-         MyKey key;
-         cache1.put(key = new MyKey("a"), "b");
-         assertEquals(1, cache1.getCacheSize());
-         assertEquals("b", cache2.get(new MyKey("a")));
-         assertEquals(1, cache2.getCacheSize());
-         
-//         int put1 = 1;
-//         int put2 = dm.getLocality(key).isLocal() ? 0 : 1;
-//
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+         Object a, b, c;
+         for (int i = 0; i < 2; i++)
+         {
+            if (i == 0)
+            {
+               a =
+                  new MyKey("a", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+                     .getRpcManager().getAddress())).getKey().value);
+            }
+            else
+            {
+               a =
+                  new MyKey("a", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+                     .getRpcManager().getAddress())).getKey().value);
+            }
+            for (int j = 0; j < 2; j++)
+            {
+               if (j == 0)
+               {
+                  b =
+                     new MyKey("b", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+                        .getRpcManager().getAddress())).getKey().value);
+               }
+               else
+               {
+                  b =
+                     new MyKey("b", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+                        .getRpcManager().getAddress())).getKey().value);
+               }
+               for (int k = 0; k < 2; k++)
+               {
+                  if (k == 0)
+                  {
+                     c =
+                        new MyKey("c", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+                           .getRpcManager().getAddress())).getKey().value);
+                  }
+                  else
+                  {
+                     c =
+                        new MyKey("c", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+                           .getRpcManager().getAddress())).getKey().value);
+                  }
+                  checkUseCase(cache1, cache2, dm, a, b, c);
+               }
+            }
+         }
+      }
+      finally
+      {
+         dcm2.stop();
+      }
+   }
 
-         assertEquals(0, listener1.get);
-         assertEquals(1, listener2.get);
-         
-         MyKey key2;
-         cache2.put(key2 = new MyKey("b"), "c");
-         assertEquals(2, cache1.getCacheSize());
-         assertEquals(2, cache2.getCacheSize());
-         assertEquals("c", cache1.get(new MyKey("b")));
-         
-//         put1 += dm.getLocality(key2).isLocal() ? 1 : 0;
-//         put2++;
-//
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   private void checkUseCase(DistributedExoCache<Serializable, Object> cache1,
+      DistributedExoCache<Serializable, Object> cache2, DistributionManager dm, Object a, Object b, Object c)
+      throws InterruptedException
+   {
+      MyCacheListener listener1 = new MyCacheListener();
+      cache1.addCacheListener(listener1);
+      MyCacheListener listener2 = new MyCacheListener();
+      cache2.addCacheListener(listener2);
+      boolean isALocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(a))).isLocal();
+      boolean isBLocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(b))).isLocal();
+      boolean isCLocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(c))).isLocal();
+      System.out.println("#####################################");
+      System.out.println("'a' is local = " + isALocal);
+      System.out.println("'b' is local = " + isBLocal);
+      System.out.println("'c' is local = " + isCLocal);
+      MyKey key = new MyKey(a);
+      cache1.put(key, "b");
+      assertEquals(1, cache1.getCacheSize());
+      assertEquals("b", cache2.get(new MyKey(a)));
+      assertEquals(1, cache2.getCacheSize());
 
-         assertEquals(1, listener1.get);
-         assertEquals(1, listener2.get);
+      int put1 = 1;
+      int put2 = isALocal ? 0 : 1;
 
-         assertEquals(2, cache1.getCacheSize());
-         assertEquals(2, cache2.getCacheSize());
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      assertEquals(0, listener1.get);
+      assertEquals(1, listener2.get);
 
-         assertEquals(1, listener1.get);
-         assertEquals(1, listener2.get);
+      MyKey key2 = new MyKey(b);
+      cache2.put(key2, "c");
+      assertEquals(2, cache1.getCacheSize());
+      assertEquals(2, cache2.getCacheSize());
+      assertEquals("c", cache1.get(new MyKey(b)));
 
-         cache2.put(key = new MyKey("a"), "a");
-         assertEquals(2, cache1.getCacheSize());
-         assertEquals(2, cache2.getCacheSize());
-         assertEquals("a", cache1.get(new MyKey("a")));
-         
-//         put1 += dm.getLocality(key).isLocal() ? 1 : 0;
-//         put2++;
-//
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      put1 += isBLocal ? 1 : 0;
+      put2++;
 
-         assertEquals(2, listener1.get);
-         assertEquals(1, listener2.get);
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-         cache2.remove(key = new MyKey("a"));
-         assertEquals(1, cache1.getCacheSize());
-         assertEquals(1, cache2.getCacheSize());
-         
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      assertEquals(1, listener1.get);
+      assertEquals(1, listener2.get);
 
-         assertEquals(2, listener1.get);
-         assertEquals(1, listener2.get);
-         
-//         int remove1 = dm.getLocality(key).isLocal() ? 1 : 0;
-//         int remove2 = 1;
-//         
-//         assertEquals(remove1, listener1.remove);
-//         assertEquals(remove2, listener2.remove);
-         
-         cache1.put(key = new MyKey("c"), "c");
-         cache1.clearCache();
-         assertEquals(0, cache1.getCacheSize());
-         assertNull(cache1.get(new MyKey("b")));
-         assertNull(cache2.get(new MyKey("b")));
-         assertNull(cache2.get(new MyKey("c")));
-         assertEquals(0, cache2.getCacheSize());
-         
-//         put1++;
-//         put2 += dm.getLocality(key).isLocal() ? 0 : 1;
+      assertEquals(2, cache1.getCacheSize());
+      assertEquals(2, cache2.getCacheSize());
 
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-         assertEquals(3, listener1.get);
-         assertEquals(3, listener2.get);
+      assertEquals(1, listener1.get);
+      assertEquals(1, listener2.get);
 
-//         assertEquals(remove1, listener1.remove);
-//         assertEquals(remove2, listener2.remove);
+      key = new MyKey(a);
+      cache2.put(key, "a");
+      assertEquals(2, cache1.getCacheSize());
+      assertEquals(2, cache2.getCacheSize());
+      assertEquals("a", cache1.get(new MyKey(a)));
 
-         assertEquals(1, listener1.clearCache);
-         assertEquals(0, listener2.clearCache);
+      put1 += isALocal ? 1 : 0;
+      put2++;
 
-         Map<Serializable, Object> values = new HashMap<Serializable, Object>();
-         values.put(key = new MyKey("a"), "a");
-         values.put(key2 = new MyKey("b"), "b");
-         cache1.putMap(values);
-         assertEquals(2, cache1.getCacheSize());
-         Thread.sleep(40);
-         assertEquals("a", cache2.get(new MyKey("a")));
-         assertEquals("b", cache2.get(new MyKey("b")));
-         assertEquals(2, cache2.getCacheSize());
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-//         put1 += 2;
-//         put2 += (dm.getLocality(key).isLocal() ? 0 : 1) + (dm.getLocality(key2).isLocal() ? 0 : 1);
-//         
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      assertEquals(2, listener1.get);
+      assertEquals(1, listener2.get);
 
-         assertEquals(3, listener1.get);
-         assertEquals(5, listener2.get);
+      key = new MyKey(a);
+      cache2.remove(key);
+      assertEquals(1, cache1.getCacheSize());
+      assertEquals(1, cache2.getCacheSize());
 
-//         assertEquals(remove1, listener1.remove);
-//         assertEquals(remove2, listener2.remove);
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-         assertEquals(1, listener1.clearCache);
-         assertEquals(0, listener2.clearCache);
+      assertEquals(2, listener1.get);
+      assertEquals(1, listener2.get);
 
-         values = new HashMap<Serializable, Object>()
+      int remove1 = isALocal ? 1 : 0;
+      int remove2 = 1;
+
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
+
+      key = new MyKey(c);
+      cache1.put(key, "c");
+      assertEquals(2, cache1.getCacheSize());
+      assertEquals(2, cache2.getCacheSize());
+      assertEquals("c", cache2.get(new MyKey(c)));
+
+      put1++;
+      put2 += isCLocal ? 0 : 1;
+
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
+
+      assertEquals(2, listener1.get);
+      assertEquals(2, listener2.get);
+
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
+
+      assertEquals(0, listener1.clearCache);
+      assertEquals(0, listener2.clearCache);
+
+      cache1.clearCache();
+      assertEquals(0, cache1.getCacheSize());
+      assertNull(cache1.get(new MyKey(b)));
+      assertNull(cache1.get(new MyKey(c)));
+      assertNull(cache2.get(new MyKey(b)));
+      assertNull(cache2.get(new MyKey(c)));
+      assertEquals(0, cache2.getCacheSize());
+
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
+
+      assertEquals(4, listener1.get);
+      assertEquals(4, listener2.get);
+      
+      // Since the clear cache map/reduce can only find cache1, 
+      // the remove calls will be applied to cache1 so cache2
+      // will be notified on its entries, this is due to the
+      // hack used to apply modifications within a map/reduce
+      remove2 += (isBLocal ? 0 : 1) + (isCLocal ? 0 : 1);
+      
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
+
+      assertEquals(1, listener1.clearCache);
+      assertEquals(0, listener2.clearCache);
+
+      Map<Serializable, Object> values = new HashMap<Serializable, Object>();
+      key = new MyKey(a);
+      key2 = new MyKey(b);
+      values.put(key, "a");
+      values.put(key2, "b");
+      cache1.putMap(values);
+      assertEquals(2, cache1.getCacheSize());
+      Thread.sleep(40);
+      assertEquals("a", cache1.get(new MyKey(a)));
+      assertEquals("b", cache1.get(new MyKey(b)));
+      assertEquals("a", cache2.get(new MyKey(a)));
+      assertEquals("b", cache2.get(new MyKey(b)));
+      assertEquals(2, cache2.getCacheSize());
+
+      put1 += 2;
+      put2 += (isALocal ? 0 : 1) + (isBLocal ? 0 : 1);
+
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
+
+      assertEquals(6, listener1.get);
+      assertEquals(6, listener2.get);
+
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
+
+      assertEquals(1, listener1.clearCache);
+      assertEquals(0, listener2.clearCache);
+
+      values = new HashMap<Serializable, Object>()
+      {
+         private static final long serialVersionUID = 1L;
+
+         public Set<Entry<Serializable, Object>> entrySet()
          {
-            private static final long serialVersionUID = 1L;
+            Set<Entry<Serializable, Object>> set = new LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
+            set.add(new Entry<Serializable, Object>()
+            {
 
-            public Set<Entry<Serializable, Object>> entrySet()
-            {
-               Set<Entry<Serializable, Object>> set = new LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
-               set.add(new Entry<Serializable, Object>()
+               public Object setValue(Object paramV)
                {
+                  return null;
+               }
 
-                  public Object setValue(Object paramV)
-                  {
-                     return null;
-                  }
+               public Object getValue()
+               {
+                  throw new RuntimeException("An exception");
+               }
 
-                  public Object getValue()
-                  {
-                     throw new RuntimeException("An exception");
-                  }
+               public Serializable getKey()
+               {
+                  return "c";
+               }
+            });
+            return set;
+         }
+      };
+      values.put(new MyKey("e"), "e");
+      values.put(new MyKey("d"), "d");
+      cache1.putMap(values);
+      assertEquals(2, cache1.getCacheSize());
+      assertEquals(2, cache2.getCacheSize());
 
-                  public Serializable getKey()
-                  {
-                     return "c";
-                  }
-               });
-               return set;
-            }
-         };
-         values.put(new MyKey("e"), "e");
-         values.put(new MyKey("d"), "d");
-         cache1.putMap(values);
-         assertEquals(2, cache1.getCacheSize());
-         assertEquals(2, cache2.getCacheSize());
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
 
-//         assertEquals(put1, listener1.put);
-//         assertEquals(put2, listener2.put);
+      assertEquals(6, listener1.get);
+      assertEquals(6, listener2.get);
 
-         assertEquals(3, listener1.get);
-         assertEquals(5, listener2.get);
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
 
-//         assertEquals(remove1, listener1.remove);
-//         assertEquals(remove2, listener2.remove);
+      assertEquals(1, listener1.clearCache);
+      assertEquals(0, listener2.clearCache);
 
-         assertEquals(1, listener1.clearCache);
-         assertEquals(0, listener2.clearCache);
+      assertEquals(0, listener1.expire);
+      assertEquals(0, listener2.expire);
 
-         assertEquals(0, listener1.expire);
-         assertEquals(0, listener2.expire);
+      cache2.clearCache();
+      assertEquals(0, cache1.getCacheSize());
+      assertEquals(0, cache2.getCacheSize());
 
-      }
-      finally
-      {
-         dcm2.stop();
-      }
+      assertEquals(put1, listener1.put);
+      assertEquals(put2, listener2.put);
+
+      assertEquals(6, listener1.get);
+      assertEquals(6, listener2.get);
+
+      // Since the clear cache map/reduce can only find cache1, 
+      // the remove calls will be applied to cache1 so cache2
+      // will be notified on its entries, this is due to the
+      // hack used to apply modifications within a map/reduce
+      remove2 += (isALocal ? 0 : 1) + (isBLocal ? 0 : 1);
+
+      assertEquals(remove1, listener1.remove);
+      assertEquals(remove2, listener2.remove);
+
+      assertEquals(1, listener1.clearCache);
+      assertEquals(1, listener2.clearCache);
+
+      assertEquals(0, listener1.expire);
+      assertEquals(0, listener2.expire);
    }
 }

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,11 +54,12 @@
    </global>
    <default>
       <locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="10000" writeSkewCheck="false" concurrencyLevel="500"/>
-      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
       <jmxStatistics enabled="true"/>
       <clustering mode="replication">
          <stateRetrieval timeout="20000" fetchInMemoryState="false"/>
          <sync replTimeout="20000"/>
       </clustering>
+      <invocationBatching enabled="true"/> 
    </default>
 </infinispan>
\ No newline at end of file

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,11 +54,12 @@
    </global>
    <default>
       <locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="20000" writeSkewCheck="false" concurrencyLevel="500" useLockStriping="true"/>
-      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
       <jmxStatistics enabled="true"/>
       <clustering mode="replication">
          <stateRetrieval timeout="20000" fetchInMemoryState="false"/>
          <sync replTimeout="20000"/>
       </clustering>
+      <invocationBatching enabled="true"/> 
    </default>
 </infinispan>
\ No newline at end of file

Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,10 +54,9 @@
    </global>
    <namedCache name="eXoCache">
       <locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="20000" writeSkewCheck="false" concurrencyLevel="500" useLockStriping="true" />
-      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" eagerLockSingleNode="true"/>
+      <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" eagerLockSingleNode="true" transactionMode="TRANSACTIONAL"/>
       <jmxStatistics enabled="true"/>
       <clustering mode="distribution">
-      	 <l1 enabled="false"/>
          <hash numOwners="${infinispan-num-owners}" rehashRpcTimeout="120000" />
          <sync/>
       </clustering>

Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml	2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/pom.xml	2012-01-04 12:31:08 UTC (rev 5420)
@@ -213,7 +213,7 @@
          <dependency>
             <groupId>org.infinispan</groupId>
             <artifactId>infinispan-core</artifactId>
-            <version>5.1.0.CR1</version>
+            <version>5.1.0.CR2</version>
          </dependency>
          <dependency>
             <groupId>org.jibx</groupId>



More information about the exo-jcr-commits mailing list