[exo-jcr-commits] exo-jcr SVN: r5420 - in kernel/trunk: exo.kernel.component.ext.cache.impl.infinispan.v5 and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Jan 4 07:31:09 EST 2012
Author: nfilotto
Date: 2012-01-04 07:31:08 -0500 (Wed, 04 Jan 2012)
New Revision: 5420
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
kernel/trunk/pom.xml
Log:
EXOJCR-1682: Improve ISPN integration to support properly the distribution mode
* Added some unit tests
* Upgraded to ISPN 5.1.0.CR2
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -24,6 +24,10 @@
<artifactId>exo.kernel.component.cache</artifactId>
</dependency>
<dependency>
+ <groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.component.common</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
</dependency>
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java 2012-01-04 12:31:08 UTC (rev 5420)
@@ -48,7 +48,6 @@
import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.util.concurrent.locks.LockManager;
import java.io.Externalizable;
import java.io.IOException;
@@ -58,12 +57,9 @@
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -123,6 +119,14 @@
{
return cache;
}
+
+ /**
+ * @return the fullName
+ */
+ String getFullName()
+ {
+ return fullName;
+ }
/**
* {@inheritDoc}
@@ -199,9 +203,9 @@
@Override
public Void run()
{
- MapReduceTask<CacheKey<K>, V, String, CacheKey<K>> task =
- new MapReduceTask<CacheKey<K>, V, String, CacheKey<K>>(cache);
- task.mappedWith(new ClearCacheMapper<K, V>(fullName)).reducedWith(new ClearCacheReducer<String, V, K>());
+ MapReduceTask<CacheKey<K>, V, Void, Void> task =
+ new MapReduceTask<CacheKey<K>, V, Void, Void>(cache);
+ task.mappedWith(new ClearCacheMapper<K, V>(fullName)).reducedWith(new ClearCacheReducer());
task.execute();
return null;
}
@@ -1074,7 +1078,7 @@
}
}
- public static class ClearCacheMapper<K, V> extends AbstractExoCacheMapper<K, V, String, CacheKey<K>>
+ public static class ClearCacheMapper<K, V> extends AbstractExoCacheMapper<K, V, Void, Void>
{
public ClearCacheMapper()
@@ -1090,14 +1094,27 @@
* {@inheritDoc}
*/
@Override
- protected void _map(CacheKey<K> key, V value, Collector<String, CacheKey<K>> collector)
+ protected void _map(CacheKey<K> key, V value, Collector<Void, Void> collector)
{
- collector.emit("keys", key);
+ ExoContainer container = ExoContainerContext.getTopContainer();
+ if (container == null)
+ {
+ LOG.error("The top container could not be found");
+ return;
+ }
+ DistributedCacheManager dcm =
+ (DistributedCacheManager)container.getComponentInstanceOfType(DistributedCacheManager.class);
+ if (dcm == null)
+ {
+ LOG.error("The DistributedCacheManager could not be found at top container level, please configure it.");
+ return;
+ }
+ Cache<CacheKey<K>, V> cache = dcm.getCache(CACHE_NAME);
+ cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.FAIL_SILENTLY).remove(key);
}
-
}
- public static class ClearCacheReducer<K, V, KIn> implements Reducer<K, CacheKey<KIn>>
+ public static class ClearCacheReducer implements Reducer<Void, Void>
{
/**
@@ -1109,46 +1126,8 @@
* @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object, java.util.Iterator)
*/
@Override
- public CacheKey<KIn> reduce(K reducedKey, Iterator<CacheKey<KIn>> iter)
+ public Void reduce(Void reducedKey, Iterator<Void> iter)
{
- CacheKey<KIn> firstKey;
- if (iter == null || !iter.hasNext() || (firstKey = iter.next()) == null)
- {
- return null;
- }
- ExoContainer container = ExoContainerContext.getTopContainer();
- if (container == null)
- {
- LOG.error("The top container could not be found");
- return null;
- }
- DistributedCacheManager dcm =
- (DistributedCacheManager)container.getComponentInstanceOfType(DistributedCacheManager.class);
- if (dcm == null)
- {
- LOG.error("The DistributedCacheManager could not be found at top container level, please configure it.");
- return null;
- }
- Cache<CacheKey<K>, V> cache = dcm.getCache(CACHE_NAME);
- final LockManager lm = cache.getAdvancedCache().getLockManager();
- // Sort the keys to prevent deadlocks
- Set<CacheKey<KIn>> keys = new TreeSet<CacheKey<KIn>>(new Comparator<CacheKey<KIn>>()
- {
- public int compare(CacheKey<KIn> o1, CacheKey<KIn> o2)
- {
- int result = lm.getLockId(o1) - lm.getLockId(o2);
- return result == 0 ? System.identityHashCode(o1) - System.identityHashCode(o2) : result;
- }
- });
- keys.add(firstKey);
- while (iter.hasNext())
- {
- keys.add(iter.next());
- }
- for (CacheKey<KIn> key : keys)
- {
- cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.FAIL_SILENTLY).remove(key);
- }
return null;
}
}
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java 2012-01-04 12:31:08 UTC (rev 5420)
@@ -27,7 +27,9 @@
import org.exoplatform.container.xml.ValueParam;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
+import org.exoplatform.services.transaction.TransactionService;
import org.infinispan.Cache;
+import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.picocontainer.Startable;
@@ -35,6 +37,8 @@
import java.security.PrivilegedExceptionAction;
import java.util.Map;
+import javax.transaction.TransactionManager;
+
/**
* This class is used to allow to use infinispan in distribution mode with
* the ability to launch infinispan instances in standalone mode, in other
@@ -77,7 +81,7 @@
public DistributedCacheManager(String configurationFile, Map<String, String> parameters,
ConfigurationManager configManager)
{
- this.manager = init(configurationFile, parameters, configManager);
+ this.manager = init(configurationFile, parameters, configManager, null);
}
/**
@@ -85,13 +89,23 @@
*/
public DistributedCacheManager(InitParams params, ConfigurationManager configManager)
{
+ this(params, configManager, null);
+ }
+
+ /**
+ * Default constructor
+ */
+ public DistributedCacheManager(InitParams params, ConfigurationManager configManager, TransactionService ts)
+ {
ValueParam vp;
final String result;
if (params != null && (vp = params.getValueParam(CONFIG_FILE_PARAMETER_NAME)) != null
&& (result = vp.getValue()) != null && !result.isEmpty())
{
PropertiesParam pp = params.getPropertiesParam(PARAMS_PARAMETER_NAME);
- this.manager = init(result, pp == null ? null : pp.getProperties(), configManager);
+ this.manager =
+ init(result, pp == null ? null : pp.getProperties(), configManager,
+ ts == null ? null : ts.getTransactionManager());
}
else
{
@@ -104,10 +118,11 @@
* @param configurationFile the path of the configuration file
* @param parameters the parameters to inject into the configuration file
* @param configManager the configuration manager used to get the configuration file
+ * @param tm the transaction manager
* @return the CacheManager initialized
*/
private EmbeddedCacheManager init(final String configurationFile, final Map<String, String> parameters,
- final ConfigurationManager configManager)
+ final ConfigurationManager configManager, final TransactionManager tm)
{
try
{
@@ -138,7 +153,14 @@
manager.start();
for (String cacheName : manager.getCacheNames())
{
- manager.getCache(cacheName);
+ Cache cache = manager.getCache(cacheName);
+ if (tm != null)
+ {
+ // We inject the transaction manager
+ ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
+ cr.registerComponent(tm, TransactionManager.class);
+ cr.rewire();
+ }
}
return manager;
}
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,7 +54,7 @@
</global>
<default>
<locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="10000" writeSkewCheck="false" concurrencyLevel="500"/>
- <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+ <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
<jmxStatistics enabled="true"/>
<invocationBatching enabled="true"/>
<clustering mode="replication">
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java 2012-01-04 12:31:08 UTC (rev 5420)
@@ -32,6 +32,9 @@
import org.exoplatform.services.cache.impl.infinispan.ExoCacheFactoryImpl;
import org.exoplatform.services.ispn.DistributedCacheManager;
import org.exoplatform.test.BasicTestCase;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyGenerator;
import org.infinispan.distribution.DistributionManager;
import java.io.Serializable;
@@ -42,8 +45,10 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -467,17 +472,29 @@
{
private static final long serialVersionUID = 1L;
- public String value;
- public MyKey(){}
- public MyKey(String value)
+ public Object value;
+
+ public String displayValue;
+
+ public MyKey()
{
+ }
+
+ public MyKey(Object value)
+ {
this.value = value;
}
+ public MyKey(String displayValue, Object value)
+ {
+ this.displayValue = displayValue;
+ this.value = value;
+ }
+
@Override
public boolean equals(Object paramObject)
{
- return paramObject instanceof MyKey && ((MyKey)paramObject).value.endsWith(value);
+ return paramObject instanceof MyKey && ((MyKey)paramObject).value.equals(value);
}
@Override
@@ -489,10 +506,30 @@
@Override
public String toString()
{
- return value;
+ return displayValue == null ? value.toString() : displayValue;
}
}
-
+
+ public static class MyKeyGenerator implements KeyGenerator<DistributedExoCache.CacheKey<MyKey>>
+ {
+
+ public static final Random rnd = new Random();
+
+ private String fullName;
+
+ public MyKeyGenerator(String fullName)
+ {
+ this.fullName = fullName;
+ }
+
+ @Override
+ public DistributedExoCache.CacheKey<MyKey> getKey()
+ {
+ return new DistributedExoCache.CacheKey<MyKey>(fullName, new MyKey(rnd.nextLong()));
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
public void testDistributedCache() throws Exception
{
PortalContainer pc = PortalContainer.getInstance();
@@ -511,191 +548,312 @@
DistributedCacheManager dcm2 =
new DistributedCacheManager("jar:/conf/portal/distributed-cache-configuration.xml", params, cm);
- @SuppressWarnings("unchecked")
DistributedExoCache<Serializable, Object> cache1 =
(DistributedExoCache<Serializable, Object>)((ExoCacheFactory)pc
.getComponentInstanceOfType(ExoCacheFactory.class)).createCache(config);
DistributionManager dm = cache1.getCache().getDistributionManager();
- MyCacheListener listener1 = new MyCacheListener();
- cache1.addCacheListener(listener1);
DistributedExoCache<Serializable, Object> cache2 =
(DistributedExoCache<Serializable, Object>)new ExoCacheFactoryImpl(
(ExoContainerContext)pc.getComponentInstanceOfType(ExoContainerContext.class),
"jar:/conf/portal/cache-configuration-template.xml", cm, dcm2).createCache(config);
- MyCacheListener listener2 = new MyCacheListener();
- cache2.addCacheListener(listener2);
+ KeyAffinityService kas1 =
+ KeyAffinityServiceFactory.newLocalKeyAffinityService(cache1.getCache(),
+ new MyKeyGenerator(cache1.getFullName()), Executors.newSingleThreadExecutor(), 100);
+ KeyAffinityService kas2 =
+ KeyAffinityServiceFactory.newLocalKeyAffinityService(cache2.getCache(),
+ new MyKeyGenerator(cache1.getFullName()), Executors.newSingleThreadExecutor(), 100);
+
try
{
- MyKey key;
- cache1.put(key = new MyKey("a"), "b");
- assertEquals(1, cache1.getCacheSize());
- assertEquals("b", cache2.get(new MyKey("a")));
- assertEquals(1, cache2.getCacheSize());
-
-// int put1 = 1;
-// int put2 = dm.getLocality(key).isLocal() ? 0 : 1;
-//
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ Object a, b, c;
+ for (int i = 0; i < 2; i++)
+ {
+ if (i == 0)
+ {
+ a =
+ new MyKey("a", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ else
+ {
+ a =
+ new MyKey("a", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ for (int j = 0; j < 2; j++)
+ {
+ if (j == 0)
+ {
+ b =
+ new MyKey("b", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ else
+ {
+ b =
+ new MyKey("b", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ for (int k = 0; k < 2; k++)
+ {
+ if (k == 0)
+ {
+ c =
+ new MyKey("c", ((DistributedExoCache.CacheKey<MyKey>)kas1.getKeyForAddress(cache1.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ else
+ {
+ c =
+ new MyKey("c", ((DistributedExoCache.CacheKey<MyKey>)kas2.getKeyForAddress(cache2.getCache()
+ .getRpcManager().getAddress())).getKey().value);
+ }
+ checkUseCase(cache1, cache2, dm, a, b, c);
+ }
+ }
+ }
+ }
+ finally
+ {
+ dcm2.stop();
+ }
+ }
- assertEquals(0, listener1.get);
- assertEquals(1, listener2.get);
-
- MyKey key2;
- cache2.put(key2 = new MyKey("b"), "c");
- assertEquals(2, cache1.getCacheSize());
- assertEquals(2, cache2.getCacheSize());
- assertEquals("c", cache1.get(new MyKey("b")));
-
-// put1 += dm.getLocality(key2).isLocal() ? 1 : 0;
-// put2++;
-//
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private void checkUseCase(DistributedExoCache<Serializable, Object> cache1,
+ DistributedExoCache<Serializable, Object> cache2, DistributionManager dm, Object a, Object b, Object c)
+ throws InterruptedException
+ {
+ MyCacheListener listener1 = new MyCacheListener();
+ cache1.addCacheListener(listener1);
+ MyCacheListener listener2 = new MyCacheListener();
+ cache2.addCacheListener(listener2);
+ boolean isALocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(a))).isLocal();
+ boolean isBLocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(b))).isLocal();
+ boolean isCLocal = dm.getLocality(new DistributedExoCache.CacheKey(cache1.getFullName(), new MyKey(c))).isLocal();
+ System.out.println("#####################################");
+ System.out.println("'a' is local = " + isALocal);
+ System.out.println("'b' is local = " + isBLocal);
+ System.out.println("'c' is local = " + isCLocal);
+ MyKey key = new MyKey(a);
+ cache1.put(key, "b");
+ assertEquals(1, cache1.getCacheSize());
+ assertEquals("b", cache2.get(new MyKey(a)));
+ assertEquals(1, cache2.getCacheSize());
- assertEquals(1, listener1.get);
- assertEquals(1, listener2.get);
+ int put1 = 1;
+ int put2 = isALocal ? 0 : 1;
- assertEquals(2, cache1.getCacheSize());
- assertEquals(2, cache2.getCacheSize());
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ assertEquals(0, listener1.get);
+ assertEquals(1, listener2.get);
- assertEquals(1, listener1.get);
- assertEquals(1, listener2.get);
+ MyKey key2 = new MyKey(b);
+ cache2.put(key2, "c");
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+ assertEquals("c", cache1.get(new MyKey(b)));
- cache2.put(key = new MyKey("a"), "a");
- assertEquals(2, cache1.getCacheSize());
- assertEquals(2, cache2.getCacheSize());
- assertEquals("a", cache1.get(new MyKey("a")));
-
-// put1 += dm.getLocality(key).isLocal() ? 1 : 0;
-// put2++;
-//
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ put1 += isBLocal ? 1 : 0;
+ put2++;
- assertEquals(2, listener1.get);
- assertEquals(1, listener2.get);
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
- cache2.remove(key = new MyKey("a"));
- assertEquals(1, cache1.getCacheSize());
- assertEquals(1, cache2.getCacheSize());
-
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ assertEquals(1, listener1.get);
+ assertEquals(1, listener2.get);
- assertEquals(2, listener1.get);
- assertEquals(1, listener2.get);
-
-// int remove1 = dm.getLocality(key).isLocal() ? 1 : 0;
-// int remove2 = 1;
-//
-// assertEquals(remove1, listener1.remove);
-// assertEquals(remove2, listener2.remove);
-
- cache1.put(key = new MyKey("c"), "c");
- cache1.clearCache();
- assertEquals(0, cache1.getCacheSize());
- assertNull(cache1.get(new MyKey("b")));
- assertNull(cache2.get(new MyKey("b")));
- assertNull(cache2.get(new MyKey("c")));
- assertEquals(0, cache2.getCacheSize());
-
-// put1++;
-// put2 += dm.getLocality(key).isLocal() ? 0 : 1;
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
- assertEquals(3, listener1.get);
- assertEquals(3, listener2.get);
+ assertEquals(1, listener1.get);
+ assertEquals(1, listener2.get);
-// assertEquals(remove1, listener1.remove);
-// assertEquals(remove2, listener2.remove);
+ key = new MyKey(a);
+ cache2.put(key, "a");
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+ assertEquals("a", cache1.get(new MyKey(a)));
- assertEquals(1, listener1.clearCache);
- assertEquals(0, listener2.clearCache);
+ put1 += isALocal ? 1 : 0;
+ put2++;
- Map<Serializable, Object> values = new HashMap<Serializable, Object>();
- values.put(key = new MyKey("a"), "a");
- values.put(key2 = new MyKey("b"), "b");
- cache1.putMap(values);
- assertEquals(2, cache1.getCacheSize());
- Thread.sleep(40);
- assertEquals("a", cache2.get(new MyKey("a")));
- assertEquals("b", cache2.get(new MyKey("b")));
- assertEquals(2, cache2.getCacheSize());
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
-// put1 += 2;
-// put2 += (dm.getLocality(key).isLocal() ? 0 : 1) + (dm.getLocality(key2).isLocal() ? 0 : 1);
-//
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ assertEquals(2, listener1.get);
+ assertEquals(1, listener2.get);
- assertEquals(3, listener1.get);
- assertEquals(5, listener2.get);
+ key = new MyKey(a);
+ cache2.remove(key);
+ assertEquals(1, cache1.getCacheSize());
+ assertEquals(1, cache2.getCacheSize());
-// assertEquals(remove1, listener1.remove);
-// assertEquals(remove2, listener2.remove);
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
- assertEquals(1, listener1.clearCache);
- assertEquals(0, listener2.clearCache);
+ assertEquals(2, listener1.get);
+ assertEquals(1, listener2.get);
- values = new HashMap<Serializable, Object>()
+ int remove1 = isALocal ? 1 : 0;
+ int remove2 = 1;
+
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
+
+ key = new MyKey(c);
+ cache1.put(key, "c");
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+ assertEquals("c", cache2.get(new MyKey(c)));
+
+ put1++;
+ put2 += isCLocal ? 0 : 1;
+
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
+
+ assertEquals(2, listener1.get);
+ assertEquals(2, listener2.get);
+
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
+
+ assertEquals(0, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ cache1.clearCache();
+ assertEquals(0, cache1.getCacheSize());
+ assertNull(cache1.get(new MyKey(b)));
+ assertNull(cache1.get(new MyKey(c)));
+ assertNull(cache2.get(new MyKey(b)));
+ assertNull(cache2.get(new MyKey(c)));
+ assertEquals(0, cache2.getCacheSize());
+
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
+
+ assertEquals(4, listener1.get);
+ assertEquals(4, listener2.get);
+
+ // Since the clear cache map/reduce can only find cache1,
+ // the remove calls will be applied to cache1 so cache2
+ // will be notified on its entries, this is due to the
+ // hack used to apply modifications within a map/reduce
+ remove2 += (isBLocal ? 0 : 1) + (isCLocal ? 0 : 1);
+
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ Map<Serializable, Object> values = new HashMap<Serializable, Object>();
+ key = new MyKey(a);
+ key2 = new MyKey(b);
+ values.put(key, "a");
+ values.put(key2, "b");
+ cache1.putMap(values);
+ assertEquals(2, cache1.getCacheSize());
+ Thread.sleep(40);
+ assertEquals("a", cache1.get(new MyKey(a)));
+ assertEquals("b", cache1.get(new MyKey(b)));
+ assertEquals("a", cache2.get(new MyKey(a)));
+ assertEquals("b", cache2.get(new MyKey(b)));
+ assertEquals(2, cache2.getCacheSize());
+
+ put1 += 2;
+ put2 += (isALocal ? 0 : 1) + (isBLocal ? 0 : 1);
+
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
+
+ assertEquals(6, listener1.get);
+ assertEquals(6, listener2.get);
+
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ values = new HashMap<Serializable, Object>()
+ {
+ private static final long serialVersionUID = 1L;
+
+ public Set<Entry<Serializable, Object>> entrySet()
{
- private static final long serialVersionUID = 1L;
+ Set<Entry<Serializable, Object>> set = new LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
+ set.add(new Entry<Serializable, Object>()
+ {
- public Set<Entry<Serializable, Object>> entrySet()
- {
- Set<Entry<Serializable, Object>> set = new LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
- set.add(new Entry<Serializable, Object>()
+ public Object setValue(Object paramV)
{
+ return null;
+ }
- public Object setValue(Object paramV)
- {
- return null;
- }
+ public Object getValue()
+ {
+ throw new RuntimeException("An exception");
+ }
- public Object getValue()
- {
- throw new RuntimeException("An exception");
- }
+ public Serializable getKey()
+ {
+ return "c";
+ }
+ });
+ return set;
+ }
+ };
+ values.put(new MyKey("e"), "e");
+ values.put(new MyKey("d"), "d");
+ cache1.putMap(values);
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
- public Serializable getKey()
- {
- return "c";
- }
- });
- return set;
- }
- };
- values.put(new MyKey("e"), "e");
- values.put(new MyKey("d"), "d");
- cache1.putMap(values);
- assertEquals(2, cache1.getCacheSize());
- assertEquals(2, cache2.getCacheSize());
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
-// assertEquals(put1, listener1.put);
-// assertEquals(put2, listener2.put);
+ assertEquals(6, listener1.get);
+ assertEquals(6, listener2.get);
- assertEquals(3, listener1.get);
- assertEquals(5, listener2.get);
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
-// assertEquals(remove1, listener1.remove);
-// assertEquals(remove2, listener2.remove);
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
- assertEquals(1, listener1.clearCache);
- assertEquals(0, listener2.clearCache);
+ assertEquals(0, listener1.expire);
+ assertEquals(0, listener2.expire);
- assertEquals(0, listener1.expire);
- assertEquals(0, listener2.expire);
+ cache2.clearCache();
+ assertEquals(0, cache1.getCacheSize());
+ assertEquals(0, cache2.getCacheSize());
- }
- finally
- {
- dcm2.stop();
- }
+ assertEquals(put1, listener1.put);
+ assertEquals(put2, listener2.put);
+
+ assertEquals(6, listener1.get);
+ assertEquals(6, listener2.get);
+
+ // Since the clear cache map/reduce can only find cache1,
+ // the remove calls will be applied to cache1 so cache2
+ // will be notified on its entries, this is due to the
+ // hack used to apply modifications within a map/reduce
+ remove2 += (isALocal ? 0 : 1) + (isBLocal ? 0 : 1);
+
+ assertEquals(remove1, listener1.remove);
+ assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(1, listener2.clearCache);
+
+ assertEquals(0, listener1.expire);
+ assertEquals(0, listener2.expire);
}
}
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,11 +54,12 @@
</global>
<default>
<locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="10000" writeSkewCheck="false" concurrencyLevel="500"/>
- <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+ <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval timeout="20000" fetchInMemoryState="false"/>
<sync replTimeout="20000"/>
</clustering>
+ <invocationBatching enabled="true"/>
</default>
</infinispan>
\ No newline at end of file
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,11 +54,12 @@
</global>
<default>
<locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="20000" writeSkewCheck="false" concurrencyLevel="500" useLockStriping="true"/>
- <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true"/>
+ <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" transactionMode="TRANSACTIONAL"/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval timeout="20000" fetchInMemoryState="false"/>
<sync replTimeout="20000"/>
</clustering>
+ <invocationBatching enabled="true"/>
</default>
</infinispan>
\ No newline at end of file
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -54,10 +54,9 @@
</global>
<namedCache name="eXoCache">
<locking isolationLevel="READ_COMMITTED" lockAcquisitionTimeout="20000" writeSkewCheck="false" concurrencyLevel="500" useLockStriping="true" />
- <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" eagerLockSingleNode="true"/>
+ <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" syncRollbackPhase="true" syncCommitPhase="true" eagerLockSingleNode="true" transactionMode="TRANSACTIONAL"/>
<jmxStatistics enabled="true"/>
<clustering mode="distribution">
- <l1 enabled="false"/>
<hash numOwners="${infinispan-num-owners}" rehashRpcTimeout="120000" />
<sync/>
</clustering>
Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml 2012-01-04 11:48:55 UTC (rev 5419)
+++ kernel/trunk/pom.xml 2012-01-04 12:31:08 UTC (rev 5420)
@@ -213,7 +213,7 @@
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
- <version>5.1.0.CR1</version>
+ <version>5.1.0.CR2</version>
</dependency>
<dependency>
<groupId>org.jibx</groupId>
More information about the exo-jcr-commits
mailing list