Author: nfilotto
Date: 2011-12-15 11:30:03 -0500 (Thu, 15 Dec 2011)
New Revision: 5331
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/AbstractMapper.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/Utils.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/configuration.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/test.policy
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/tsm-excludes.properties
kernel/trunk/exo.kernel.container/src/main/java/org/exoplatform/container/util/TemplateConfigurationHelper.java
kernel/trunk/exo.kernel.container/src/test/java/org/exoplatform/container/util/TestTemplateConfigurationHelper.java
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/AbstractExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/ExoCacheFactoryImpl.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/generic/GenericExoCacheCreator.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/TestAbstractExoCache.java
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/test-configuration.xml
kernel/trunk/pom.xml
Log:
EXOJCR-1682: Improve ISPN integration to support properly the distribution mode (kernel
part)
* Upgrade to ISPN 5.1.0.CR1
* Propose a distributed eXo cache based on ISPN that is enabled when distributed is set to
true in the eXo Cache Config, this cache is shared to be able to extend the capacity of
the cache using external JVM in standalone mode
Modified: kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml
===================================================================
--- kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml 2011-12-15
15:26:48 UTC (rev 5330)
+++ kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/pom.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -1,19 +1,9 @@
- <!--
-
- Copyright (C) 2009 eXo Platform SAS. This is free software; you can
- redistribute it and/or modify it under the terms of the GNU Lesser
- General Public License as published by the Free Software Foundation;
- either version 2.1 of the License, or (at your option) any later
- version. This software is distributed in the hope that it will be
- useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- Lesser General Public License for more details. You should have
- received a copy of the GNU Lesser General Public License along with
- this software; if not, write to the Free Software Foundation, Inc., 51
- Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
- site:
http://www.fsf.org.
- -->
+<!-- Copyright (C) 2009 eXo Platform SAS. This is free software; you can redistribute
it and/or modify it under the terms of the GNU Lesser General Public License
+ as published by the Free Software Foundation; either version 2.1 of the License, or (at
your option) any later version. This software is distributed in the hope
+ that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
+ Public License for more details. You should have received a copy of the GNU Lesser
General Public License along with this software; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
site:
http://www.fsf.org. -->
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -27,6 +17,10 @@
<dependencies>
<dependency>
<groupId>org.exoplatform.kernel</groupId>
+ <artifactId>exo.kernel.commons.test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.exoplatform.kernel</groupId>
<artifactId>exo.kernel.component.cache</artifactId>
</dependency>
<dependency>
@@ -36,7 +30,7 @@
<dependency>
<groupId>org.jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.12.1.3.Final</version>
+ <version>3.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -49,25 +43,18 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
+ <argLine>${env.MAVEN_OPTS}
-Djava.security.manager=org.exoplatform.commons.test.TestSecurityManager
-Djava.security.policy=${project.build.directory}/test-classes/test.policy</argLine>
<systemProperties>
- <!--
- We add this system property due to some incompatibility between
- IPv6 and some JVM of Linux distributions such as Ubuntu and
- Fedora
- -->
+ <!-- We add this system property due to some incompatibility between IPv6 and
some JVM of Linux distributions such as Ubuntu and Fedora -->
<property>
<name>java.net.preferIPv4Stack</name>
<value>true</value>
</property>
<!-- Avoid the firewall -->
<property>
- <name>bind.address</name>
+ <name>jgroups.bind_addr</name>
<value>127.0.0.1</value>
</property>
- <property>
- <name>jgroups.stack</name>
- <value>udp</value>
- </property>
</systemProperties>
</configuration>
</plugin>
@@ -79,6 +66,43 @@
<target>1.6</target>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-policy</id>
+ <phase>process-test-resources</phase>
+ <configuration>
+ <tasks>
+ <echo>Creating Access Policy for tests</echo>
+ <makeurl file="${settings.localRepository}"
property="localRepositoryURL" />
+ <makeurl file="${project.build.outputDirectory}"
property="outputDirectoryURL" />
+ <makeurl file="${project.build.testOutputDirectory}"
property="testOutputDirectoryURL" />
+ <copy todir="${project.build.testOutputDirectory}"
overwrite="true">
+ <fileset dir="${project.basedir}/src/test/resources/">
+ <include name="test.policy" />
+ </fileset>
+ <filterset>
+ <filter token="MAVEN_REPO" value="${localRepositoryURL}"
/>
+ <filter token="MAIN_CLASSES"
value="${outputDirectoryURL}" />
+ <filter token="TEST_CLASSES"
value="${testOutputDirectoryURL}" />
+ </filterset>
+ </copy>
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ <dependencies>
+ <dependency>
+ <groupId>ant</groupId>
+ <artifactId>ant-optional</artifactId>
+ <version>1.5.3-1</version>
+ </dependency>
+ </dependencies>
+ </plugin>
</plugins>
</build>
</project>
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/AbstractExoCache.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/AbstractExoCache.java 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/AbstractExoCache.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -18,6 +18,7 @@
*/
package org.exoplatform.services.cache.impl.infinispan;
+import org.exoplatform.commons.utils.SecurityHelper;
import org.exoplatform.services.cache.CacheInfo;
import org.exoplatform.services.cache.CacheListener;
import org.exoplatform.services.cache.CacheListenerContext;
@@ -39,6 +40,7 @@
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import java.io.Serializable;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -219,13 +221,22 @@
/**
* {@inheritDoc}
*/
- public void put(K key, V value) throws NullPointerException
+ public void put(final K key, final V value) throws NullPointerException
{
if (key == null)
{
throw new NullPointerException("No null cache key accepted");
- }
- putOnly(key, value);
+ }
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+
+ @Override
+ public Void run()
+ {
+ putOnly(key, value);
+ return null;
+ }
+ });
onPut(key, value);
}
@@ -240,7 +251,7 @@
/**
* {@inheritDoc}
*/
- public void putMap(Map<? extends K, ? extends V> objs) throws
NullPointerException, IllegalArgumentException
+ public void putMap(final Map<? extends K, ? extends V> objs) throws
NullPointerException, IllegalArgumentException
{
if (objs == null)
{
@@ -253,39 +264,56 @@
throw new IllegalArgumentException("No null cache key accepted");
}
}
- cache.startBatch();
- try
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
{
- // Start transaction
- for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
+
+ @Override
+ public Void run()
{
- putOnly(entry.getKey(), entry.getValue());
+ cache.startBatch();
+ try
+ {
+ // Start transaction
+ for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
+ {
+ putOnly(entry.getKey(), entry.getValue());
+ }
+ cache.endBatch(true);
+ // End transaction
+ for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
+ {
+ onPut(entry.getKey(), entry.getValue());
+ }
+ }
+ catch (Exception e)
+ {
+ cache.endBatch(false);
+ LOG.warn("An error occurs while executing the putMap method",
e);
+ }
+ return null;
}
- cache.endBatch(true);
- // End transaction
- for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
- {
- onPut(entry.getKey(), entry.getValue());
- }
- }
- catch (Exception e)
- {
- cache.endBatch(false);
- LOG.warn("An error occurs while executing the putMap method", e);
- }
+ });
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
- public V remove(Serializable name) throws NullPointerException
+ public V remove(final Serializable name) throws NullPointerException
{
if (name == null)
{
throw new NullPointerException("No null cache key accepted");
}
- V result = cache.remove(name);
+ V result = SecurityHelper.doPrivilegedAction(new PrivilegedAction<V>()
+ {
+
+ @Override
+ public V run()
+ {
+ return cache.remove(name);
+ }
+ });
onRemove((K)name, result);
return result;
}
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/ExoCacheFactoryImpl.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/ExoCacheFactoryImpl.java 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/ExoCacheFactoryImpl.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -18,6 +18,7 @@
*/
package org.exoplatform.services.cache.impl.infinispan;
+import org.exoplatform.commons.utils.SecurityHelper;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.configuration.ConfigurationManager;
import org.exoplatform.container.xml.InitParams;
@@ -26,7 +27,10 @@
import org.exoplatform.services.cache.ExoCacheConfig;
import org.exoplatform.services.cache.ExoCacheFactory;
import org.exoplatform.services.cache.ExoCacheInitException;
+import org.exoplatform.services.cache.impl.infinispan.distributed.DistributedExoCache;
import org.exoplatform.services.cache.impl.infinispan.generic.GenericExoCacheCreator;
+import org.exoplatform.services.ispn.DistributedCacheManager;
+import org.exoplatform.services.ispn.Utils;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
import org.infinispan.Cache;
@@ -36,12 +40,11 @@
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.jmx.MBeanServerLookup;
import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
-import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
-import java.net.URL;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,8 +67,8 @@
/**
* The logger
*/
- private static final Log LOG =
-
ExoLogger.getLogger("exo.kernel.component.ext.cache.impl.infinispan.v5.ExoCacheFactoryImpl");
+ private static final Log LOG = ExoLogger
+
.getLogger("exo.kernel.component.ext.cache.impl.infinispan.v5.ExoCacheFactoryImpl");
/**
* The initial parameter key that defines the full path of the configuration template
@@ -73,10 +76,15 @@
private static final String CACHE_CONFIG_TEMPLATE_KEY =
"cache.config.template";
/**
+ * The cache manager for the distributed cache
+ */
+ private final DistributedCacheManager distributedCacheManager;
+
+ /**
* The current {@link ExoContainerContext}
*/
private final ExoContainerContext ctx;
-
+
/**
* The configuration manager that allows us to retrieve a configuration file in
several different
* manners
@@ -103,35 +111,48 @@
* The mapping between the cache names and the configuration paths
*/
private final Map<String, String> mappingCacheNameConfig = new
HashMap<String, String>();
-
+
/**
* The mapping between the global configuration and the cache managers
- */
- private final Map<GlobalConfiguration, DefaultCacheManager>
mappingGlobalConfigCacheManager =
- new HashMap<GlobalConfiguration, DefaultCacheManager>();
+ */
+ private final Map<GlobalConfiguration, DefaultCacheManager>
mappingGlobalConfigCacheManager =
+ new HashMap<GlobalConfiguration, DefaultCacheManager>();
/**
* The default creator
*/
private final ExoCacheCreator defaultCreator = new GenericExoCacheCreator();
-
+
private static final MBeanServerLookup MBEAN_SERVER_LOOKUP = new MBeanServerLookup()
{
public MBeanServer getMBeanServer(Properties properties)
{
return ExoContainerContext.getTopContainer().getMBeanServer();
- }
+ }
};
public ExoCacheFactoryImpl(ExoContainerContext ctx, InitParams params,
ConfigurationManager configManager)
throws ExoCacheInitException
{
- this(ctx, getValueParam(params, CACHE_CONFIG_TEMPLATE_KEY), configManager);
+ this(ctx, params, configManager, null);
}
ExoCacheFactoryImpl(ExoContainerContext ctx, String cacheConfigTemplate,
ConfigurationManager configManager)
throws ExoCacheInitException
{
+ this(ctx, cacheConfigTemplate, configManager, null);
+ }
+
+ public ExoCacheFactoryImpl(ExoContainerContext ctx, InitParams params,
ConfigurationManager configManager,
+ DistributedCacheManager dcm) throws ExoCacheInitException
+ {
+ this(ctx, getValueParam(params, CACHE_CONFIG_TEMPLATE_KEY), configManager, dcm);
+ }
+
+ public ExoCacheFactoryImpl(ExoContainerContext ctx, String cacheConfigTemplate,
ConfigurationManager configManager,
+ DistributedCacheManager dcm) throws ExoCacheInitException
+ {
+ this.distributedCacheManager = dcm;
this.ctx = ctx;
this.configManager = configManager;
if (cacheConfigTemplate == null)
@@ -148,42 +169,62 @@
* Initializes the {@link DefaultCacheManager}
* @throws ExoCacheInitException if the cache manager cannot be initialized
*/
- private DefaultCacheManager initCacheManager(String cacheConfigTemplate)
- throws ExoCacheInitException
+ private DefaultCacheManager initCacheManager(final String cacheConfigTemplate) throws
ExoCacheInitException
{
- InputStream is = null;
try
{
- // Read the configuration file of the cache
- is = configManager.getInputStream(cacheConfigTemplate);
+ return SecurityHelper.doPrivilegedExceptionAction(new
PrivilegedExceptionAction<DefaultCacheManager>()
+ {
+ public DefaultCacheManager run() throws Exception
+ {
+ InputStream is = null;
+ try
+ {
+ // Read the configuration file of the cache
+ is = configManager.getInputStream(cacheConfigTemplate);
+ }
+ catch (Exception e)
+ {
+ throw new ExoCacheInitException("The configuration of the
CacheManager cannot be loaded from '"
+ + cacheConfigTemplate + "'", e);
+ }
+ if (is == null)
+ {
+ throw new ExoCacheInitException("The configuration of the
CacheManager cannot be found at '"
+ + cacheConfigTemplate + "'");
+ }
+ DefaultCacheManager cacheManager = null;
+ try
+ {
+ // Create the CacheManager from the input stream
+ cacheManager = new DefaultCacheManager(is, false);
+ }
+ catch (Exception e)
+ {
+ throw new ExoCacheInitException("Cannot initialize the
CacheManager corresponding to the configuration '"
+ + cacheConfigTemplate + "'", e);
+ }
+
+ GlobalConfiguration config = cacheManager.getGlobalConfiguration();
+
+ configureCacheManager(config);
+ cacheManager.start();
+ return cacheManager;
+ }
+ });
}
- catch (Exception e)
+ catch (PrivilegedActionException e)
{
- throw new ExoCacheInitException("The configuration of the CacheManager
cannot be loaded from '"
- + cacheConfigTemplate + "'", e);
- }
- if (is == null)
- {
- throw new ExoCacheInitException("The configuration of the CacheManager
cannot be found at '"
- + cacheConfigTemplate + "'");
- }
- DefaultCacheManager cacheManager = null;
- try
- {
- // Create the CacheManager from the input stream
- cacheManager = new DefaultCacheManager(is, false);
- }
- catch (Exception e)
- {
- throw new ExoCacheInitException("Cannot initialize the CacheManager
corresponding to the configuration '"
- + cacheConfigTemplate + "'", e);
- }
-
- GlobalConfiguration config = cacheManager.getGlobalConfiguration();
-
- configureCacheManager(config);
- cacheManager.start();
- return cacheManager;
+ Throwable cause = e.getCause();
+ if (cause instanceof ExoCacheInitException)
+ {
+ throw (ExoCacheInitException)cause;
+ }
+ else
+ {
+ throw new ExoCacheInitException(e);
+ }
+ }
}
/**
@@ -199,7 +240,7 @@
// Configure the name of the cache manager
config.fluent().globalJmxStatistics().cacheManagerName(config.getCacheManagerName()
+ "_" + ctx.getName()).
// Configure the MBeanServerLookup
- mBeanServerLookup(MBEAN_SERVER_LOOKUP);
+ mBeanServerLookup(MBEAN_SERVER_LOOKUP);
}
/**
@@ -227,75 +268,10 @@
*/
private boolean loadJGroupsConfig(GlobalConfiguration config) throws
ExoCacheInitException
{
- Properties properties = config.getTransportProperties();
- if (properties == null ||
!properties.containsKey(JGroupsTransport.CONFIGURATION_FILE))
- {
- return false;
- }
- InputStream is;
- String jgroupsFileLocation =
properties.getProperty(JGroupsTransport.CONFIGURATION_FILE);
- try
- {
- // Read the jgroups configuration file
- URL url = configManager.getURL(jgroupsFileLocation);
- is = url == null ? null : url.openStream();
- }
- catch (Exception e)
- {
- throw new ExoCacheInitException("The jgroups configuration cannot be loaded
from '" + jgroupsFileLocation
- + "'", e);
- }
- if (is != null)
- {
- try
- {
- // Set the jgroups configuration as XML
- properties.setProperty(JGroupsTransport.CONFIGURATION_XML, readStream(is));
- }
- catch (IOException e)
- {
- throw new ExoCacheInitException("The jgroups configuration cannot be
read from '" + jgroupsFileLocation
- + "'");
- }
- // Remove the property corresponding to the configuration file
- properties.remove(JGroupsTransport.CONFIGURATION_FILE);
- }
- return true;
+ return Utils.loadJGroupsConfig(configManager, config);
}
/**
- * Reads bytes from input stream and builds a string from them
- *
- * @param inputStream
- * @return
- * @throws IOException
- */
- protected String readStream(InputStream inputStream) throws IOException
- {
- StringBuilder out = new StringBuilder(4096);
- byte[] b = new byte[4096];
- try
- {
- for (int length; (length = inputStream.read(b)) != -1;)
- {
- out.append(new String(b, 0, length));
- }
- }
- finally
- {
- try
- {
- inputStream.close();
- }
- catch (Exception e)
- {
- LOG.debug("Cannot close stream", e);
- }
- }
- return out.toString();
- }
-
- /**
* To create a new cache instance according to the given configuration, we follow the
steps below:
*
* We first try to find if a specific location of the cache configuration has been
defined thanks
@@ -303,7 +279,8 @@
* configuration defined in this file otherwise we use the default cache configuration
defined in
* "${CACHE_CONFIG_TEMPLATE_KEY}"
*/
- public ExoCache<Serializable, Object> createCache(ExoCacheConfig config) throws
ExoCacheInitException
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public ExoCache<Serializable, Object> createCache(final ExoCacheConfig config)
throws ExoCacheInitException
{
final String region = config.getName();
final String customConfig = mappingCacheNameConfig.get(region);
@@ -314,29 +291,63 @@
final Configuration conf;
if (customConfig != null)
{
- // A custom configuration has been set
- if (LOG.isInfoEnabled())
- LOG.info("A custom configuration has been set for the cache
'" + region + "'.");
- // Create the CacheManager by loading the configuration
- DefaultCacheManager customCacheManager = new
DefaultCacheManager(configManager.getInputStream(customConfig), false);
- GlobalConfiguration gc = customCacheManager.getGlobalConfiguration();
- // Configure JGroups and JMX since it could affect the state of the Global
Config
- configureCacheManager(gc);
- // Check if a CacheManager with the same GlobalConfiguration exists
- DefaultCacheManager currentCacheManager =
mappingGlobalConfigCacheManager.get(gc);
- if (currentCacheManager == null)
+ try
{
- // No cache manager has been defined so far for this Cache Configuration
- currentCacheManager = customCacheManager;
- // Use a different cache manager name to prevent naming conflict
-
gc.fluent().globalJmxStatistics().cacheManagerName(gc.getCacheManagerName() +
"_" + region + "_" + ctx.getName());
- currentCacheManager.start();
- // We register this new cache manager
- mappingGlobalConfigCacheManager.put(gc, customCacheManager);
+ cacheManager = SecurityHelper.doPrivilegedExceptionAction(new
PrivilegedExceptionAction<DefaultCacheManager>()
+ {
+ public DefaultCacheManager run() throws Exception
+ {
+ // A custom configuration has been set
+ if (LOG.isInfoEnabled())
+ LOG.info("A custom configuration has been set for the cache
'" + region + "'.");
+ // Create the CacheManager by loading the configuration
+ DefaultCacheManager customCacheManager =
+ new
DefaultCacheManager(configManager.getInputStream(customConfig), false);
+ GlobalConfiguration gc =
customCacheManager.getGlobalConfiguration();
+ // Configure JGroups and JMX since it could affect the state of the
Global Config
+ configureCacheManager(gc);
+ // Check if a CacheManager with the same GlobalConfiguration exists
+ DefaultCacheManager currentCacheManager =
mappingGlobalConfigCacheManager.get(gc);
+ if (currentCacheManager == null)
+ {
+ // No cache manager has been defined so far for this Cache
Configuration
+ currentCacheManager = customCacheManager;
+ // Use a different cache manager name to prevent naming conflict
+ gc.fluent().globalJmxStatistics()
+ .cacheManagerName(gc.getCacheManagerName() + "_" +
region + "_" + ctx.getName());
+ currentCacheManager.start();
+ // We register this new cache manager
+ mappingGlobalConfigCacheManager.put(gc, customCacheManager);
+ }
+ return currentCacheManager;
+ }
+ });
}
- conf = currentCacheManager.getDefaultConfiguration().clone();
- cacheManager = currentCacheManager;
+ catch (PrivilegedActionException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof Exception)
+ {
+ throw (Exception)cause;
+ }
+ else
+ {
+ throw new Exception(e);
+ }
+ }
+ conf = cacheManager.getDefaultConfiguration().clone();
}
+ else if (config.isDistributed())
+ {
+ // We expect a distributed cache
+ if (distributedCacheManager == null)
+ {
+ throw new NullPointerException("The DistributedCacheManager has not
been defined in the configuration,"
+ + " please configure it at root container level if you want to use
a distributed cache.");
+ }
+ return new DistributedExoCache(ctx, config,
+ distributedCacheManager.getCache(DistributedExoCache.CACHE_NAME));
+ }
else
{
cacheManager = this.cacheManager;
@@ -344,7 +355,7 @@
if (LOG.isInfoEnabled())
LOG.info("The configuration template will be used for the the cache
'" + region + "'.");
conf = cacheManager.getDefaultConfiguration().clone();
- if (!config.isDistributed() && !config.isRepicated())
+ if (!config.isRepicated())
{
// The cache is local
conf.fluent().clustering().mode(CacheMode.LOCAL);
@@ -359,10 +370,31 @@
@Override
public Cache<Serializable, Object> call() throws Exception
{
- // Define the configuration
- cacheManager.defineConfiguration(region, conf);
- // create and start the cache
- return cacheManager.getCache(region);
+ try
+ {
+ return SecurityHelper.doPrivilegedExceptionAction(new
PrivilegedExceptionAction<Cache<Serializable, Object>>()
+ {
+ public Cache<Serializable, Object> run() throws Exception
+ {
+ // Define the configuration
+ cacheManager.defineConfiguration(region, conf);
+ // create and start the cache
+ return cacheManager.getCache(region);
+ }
+ });
+ }
+ catch (PrivilegedActionException e)
+ {
+ Throwable cause = e.getCause();
+ if (cause instanceof Exception)
+ {
+ throw (Exception)cause;
+ }
+ else
+ {
+ throw new Exception(e);
+ }
+ }
}
});
}
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/distributed/DistributedExoCache.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,1180 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.cache.impl.infinispan.distributed;
+
+import org.exoplatform.commons.utils.SecurityHelper;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.management.annotations.Managed;
+import org.exoplatform.management.annotations.ManagedDescription;
+import org.exoplatform.management.annotations.ManagedName;
+import org.exoplatform.services.cache.CacheInfo;
+import org.exoplatform.services.cache.CacheListener;
+import org.exoplatform.services.cache.CacheListenerContext;
+import org.exoplatform.services.cache.CachedObjectSelector;
+import org.exoplatform.services.cache.ExoCache;
+import org.exoplatform.services.cache.ExoCacheConfig;
+import org.exoplatform.services.cache.ObjectCacheInfo;
+import org.exoplatform.services.ispn.AbstractMapper;
+import org.exoplatform.services.ispn.DistributedCacheManager;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.infinispan.AdvancedCache;
+import org.infinispan.Cache;
+import org.infinispan.context.Flag;
+import org.infinispan.distexec.mapreduce.Collector;
+import org.infinispan.distexec.mapreduce.MapReduceTask;
+import org.infinispan.distexec.mapreduce.Reducer;
+import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntriesEvicted;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
+import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
+import org.infinispan.notifications.cachelistener.event.CacheEntriesEvictedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
+import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
+import org.infinispan.util.concurrent.locks.LockManager;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:nfilotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ *
+ */
+public class DistributedExoCache<K extends Serializable, V> implements
ExoCache<K, V>
+{
+
+ /**
+ * Logger.
+ */
+ private static final Log LOG = ExoLogger
+
.getLogger("exo.kernel.component.ext.cache.impl.infinispan.v5.DistributedExoCache");
+
+ public static final String CACHE_NAME = "eXoCache";
+
+ private final AtomicInteger hits = new AtomicInteger(0);
+
+ private final AtomicInteger misses = new AtomicInteger(0);
+
+ private String label;
+
+ private String name;
+
+ private final String fullName;
+
+ private boolean distributed;
+
+ private boolean replicated;
+
+ private boolean logEnabled;
+
+ @SuppressWarnings("rawtypes")
+ private static final ConcurrentMap<Cache, ConcurrentMap<String,
List<ListenerContext>>> ALL_LISTENERS =
+ new ConcurrentHashMap<Cache, ConcurrentMap<String,
List<ListenerContext>>>();
+
+ protected final AdvancedCache<CacheKey<K>, V> cache;
+
+ @SuppressWarnings("unchecked")
+ public DistributedExoCache(ExoContainerContext ctx, ExoCacheConfig config, Cache<K,
V> cache)
+ {
+ this.fullName = ctx.getName() + "-" + config.getName();
+ this.cache = (AdvancedCache<CacheKey<K>, V>)cache.getAdvancedCache();
+ setDistributed(config.isDistributed());
+ setLabel(config.getLabel());
+ setName(config.getName());
+ setLogEnabled(config.isLogEnabled());
+ setReplicated(config.isRepicated());
+ }
+
+ AdvancedCache<CacheKey<K>, V> getCache()
+ {
+ return cache;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("rawtypes")
+ public void addCacheListener(CacheListener<? super K, ? super V> listener)
+ {
+ if (listener == null)
+ {
+ throw new NullPointerException();
+ }
+ List<ListenerContext> lListeners = getListeners(fullName);
+ if (lListeners == null)
+ {
+ lListeners = new CopyOnWriteArrayList<ListenerContext>();
+ boolean alreadyAdded = false;
+ ConcurrentMap<String, List<ListenerContext>> listeners =
getOrCreateListeners();
+ if (listeners.isEmpty())
+ {
+ synchronized (listeners)
+ {
+ if (listeners.isEmpty())
+ {
+ // Ensure that the listener is added only once
+ cache.addListener(new CacheEventListener());
+ listeners.put(fullName, lListeners);
+ alreadyAdded = true;
+ }
+ }
+ }
+ if (!alreadyAdded)
+ {
+ List<ListenerContext> oldValue = listeners.putIfAbsent(fullName,
lListeners);
+ if (oldValue != null)
+ {
+ lListeners = oldValue;
+ }
+ }
+ }
+ lListeners.add(new ListenerContext<K, V>(listener, this));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private ConcurrentMap<String, List<ListenerContext>>
getOrCreateListeners()
+ {
+ ConcurrentMap<String, List<ListenerContext>> listeners =
ALL_LISTENERS.get(cache);
+ if (listeners == null)
+ {
+ listeners = new ConcurrentHashMap<String, List<ListenerContext>>();
+ ConcurrentMap<String, List<ListenerContext>> oldValue =
ALL_LISTENERS.putIfAbsent(cache, listeners);
+ if (oldValue != null)
+ {
+ listeners = oldValue;
+ }
+ }
+ return listeners;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private List<ListenerContext> getListeners(String fullName)
+ {
+ ConcurrentMap<String, List<ListenerContext>> listeners =
ALL_LISTENERS.get(cache);
+ return listeners == null ? null : listeners.get(fullName);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void clearCache()
+ {
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+
+ @Override
+ public Void run()
+ {
+ MapReduceTask<CacheKey<K>, V, String, CacheKey<K>> task =
new MapReduceTask<CacheKey<K>, V, String, CacheKey<K>>(cache);
+ task.mappedWith(new ClearCacheMapper<K, V>(fullName)).reducedWith(new
ClearCacheReducer<String, V, K>());
+ task.execute();
+ return null;
+ }
+
+ });
+ onClearCache();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ public V get(Serializable name)
+ {
+ if (name == null)
+ {
+ return null;
+ }
+ @SuppressWarnings("rawtypes")
+ final CacheKey key = new CacheKey<Serializable>(fullName, name);
+ final V result = SecurityHelper.doPrivilegedAction(new PrivilegedAction<V>()
+ {
+
+ @Override
+ public V run()
+ {
+ return cache.get(key);
+ }
+
+ });
+ if (result == null)
+ {
+ misses.incrementAndGet();
+ }
+ else
+ {
+ hits.incrementAndGet();
+ }
+ onGet(key, result);
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getCacheHit()
+ {
+ return hits.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getCacheMiss()
+ {
+ return misses.get();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public int getCacheSize()
+ {
+ Map<String, Integer> map = SecurityHelper.doPrivilegedAction(new
PrivilegedAction<Map<String, Integer>>()
+ {
+
+ @Override
+ public Map<String, Integer> run()
+ {
+ MapReduceTask<CacheKey<K>, V, String, Integer> task = new
MapReduceTask<CacheKey<K>, V, String, Integer>(cache);
+ task.mappedWith(new GetSizeMapper<K, V>(fullName)).reducedWith(new
GetSizeReducer<String>());
+ return task.execute();
+ }
+
+ });
+ int sum = 0;
+ for (Integer i : map.values())
+ {
+ sum += i;
+ }
+ return sum;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<V> getCachedObjects()
+ {
+ Map<String, List<V>> map = SecurityHelper.doPrivilegedAction(new
PrivilegedAction<Map<String, List<V>>>()
+ {
+
+ @Override
+ public Map<String, List<V>> run()
+ {
+ MapReduceTask<CacheKey<K>, V, String, List<V>> task = new
MapReduceTask<CacheKey<K>, V, String, List<V>>(cache);
+ task.mappedWith(new GetCachedObjectsMapper<K,
V>(fullName)).reducedWith(new GetCachedObjectsReducer<String, V>());
+ return task.execute();
+ }
+
+ });
+ List<V> result = new ArrayList<V>();
+ for (List<V> vals : map.values())
+ {
+ result.addAll(vals);
+ }
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getLabel()
+ {
+ return label;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public String getName()
+ {
+ return name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isDistributed()
+ {
+ return distributed;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isLogEnabled()
+ {
+ return logEnabled;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean isReplicated()
+ {
+ return replicated;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void put(final K key, final V value) throws NullPointerException
+ {
+ if (key == null)
+ {
+ throw new NullPointerException("No null cache key accepted");
+ }
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+
+ @Override
+ public Void run()
+ {
+ putOnly(key, value);
+ return null;
+ }
+
+ });
+ onPut(key, value);
+ }
+
+ /**
+ * Only puts the data into the cache nothing more
+ */
+ protected void putOnly(K key, V value)
+ {
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(new CacheKey<K>(fullName, key),
value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void putMap(final Map<? extends K, ? extends V> objs) throws
NullPointerException, IllegalArgumentException
+ {
+ if (objs == null)
+ {
+ throw new NullPointerException("No null map accepted");
+ }
+ for (Serializable name : objs.keySet())
+ {
+ if (name == null)
+ {
+ throw new IllegalArgumentException("No null cache key accepted");
+ }
+ }
+ SecurityHelper.doPrivilegedAction(new PrivilegedAction<Void>()
+ {
+
+ @Override
+ public Void run()
+ {
+ cache.startBatch();
+ try
+ {
+ // Start transaction
+ for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
+ {
+ putOnly(entry.getKey(), entry.getValue());
+ }
+ cache.endBatch(true);
+ // End transaction
+ for (Map.Entry<? extends K, ? extends V> entry : objs.entrySet())
+ {
+ onPut(entry.getKey(), entry.getValue());
+ }
+ }
+ catch (Exception e)
+ {
+ cache.endBatch(false);
+ LOG.warn("An error occurs while executing the putMap method",
e);
+ }
+ return null;
+ }
+ });
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ public V remove(Serializable name) throws NullPointerException
+ {
+ if (name == null)
+ {
+ throw new NullPointerException("No null cache key accepted");
+ }
+ @SuppressWarnings("rawtypes")
+ CacheKey key = new CacheKey<Serializable>(fullName, name);
+ V result = cache.remove(key);
+ onRemove(key, result);
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public List<V> removeCachedObjects()
+ {
+ final List<V> list = getCachedObjects();
+ clearCache();
+ return list;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void select(CachedObjectSelector<? super K, ? super V> selector) throws
Exception
+ {
+ if (selector == null)
+ {
+ throw new IllegalArgumentException("No null selector");
+ }
+ Map<K, V> map = SecurityHelper.doPrivilegedAction(new
PrivilegedAction<Map<K, V>>()
+ {
+
+ @Override
+ public Map<K, V> run()
+ {
+ MapReduceTask<CacheKey<K>, V, K, V> task = new
MapReduceTask<CacheKey<K>, V, K, V>(cache);
+ task.mappedWith(new GetEntriesMapper<K, V>(fullName)).reducedWith(new
GetEntriesReducer<K, V>());
+ return task.execute();
+ }
+
+ });
+
+ for (K key : map.keySet())
+ {
+ if (key == null)
+ {
+ continue;
+ }
+ final V value = map.get(key);
+ ObjectCacheInfo<V> info = new ObjectCacheInfo<V>()
+ {
+ public V get()
+ {
+ return value;
+ }
+
+ public long getExpireTime()
+ {
+ // Cannot know: The expire time is managed by Infinispan itself
+ return -1;
+ }
+ };
+ if (selector.select(key, info))
+ {
+ selector.onSelect(this, key, info);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setDistributed(boolean distributed)
+ {
+ this.distributed = distributed;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setLabel(String label)
+ {
+ this.label = label;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setLogEnabled(boolean logEnabled)
+ {
+ this.logEnabled = logEnabled;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setName(String name)
+ {
+ this.name = name;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void setReplicated(boolean replicated)
+ {
+ this.replicated = replicated;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void onExpire(CacheKey<K> key, V obj)
+ {
+ List<ListenerContext> listeners = getListeners(key.getFullName());
+ if (listeners == null || listeners.isEmpty())
+ {
+ return;
+ }
+ for (ListenerContext context : listeners)
+ {
+ try
+ {
+ context.onExpire(key.getKey(), obj);
+ }
+ catch (Exception e)
+ {
+ if (LOG.isWarnEnabled())
+ LOG.warn("Cannot execute the CacheListener properly", e);
+ }
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void onRemove(CacheKey<K> key, V obj)
+ {
+ List<ListenerContext> listeners = getListeners(key.getFullName());
+ if (listeners == null || listeners.isEmpty())
+ {
+ return;
+ }
+ for (ListenerContext context : listeners)
+ {
+ try
+ {
+ context.onRemove(key, obj);
+ }
+ catch (Exception e)
+ {
+ if (LOG.isWarnEnabled())
+ LOG.warn("Cannot execute the CacheListener properly", e);
+ }
+ }
+ }
+
+ void onPut(CacheKey<K> key, V obj)
+ {
+ onPut(key.getFullName(), key.getKey(), obj);
+ }
+
+ void onPut(K key, V obj)
+ {
+ onPut(fullName, key, obj);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void onPut(String fullName, K key, V obj)
+ {
+ List<ListenerContext> listeners = getListeners(fullName);
+ if (listeners == null || listeners.isEmpty())
+ {
+ return;
+ }
+ for (ListenerContext context : listeners)
+ {
+ try
+ {
+ context.onPut(key, obj);
+ }
+ catch (Exception e)
+ {
+ if (LOG.isWarnEnabled())
+ LOG.warn("Cannot execute the CacheListener properly", e);
+ }
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void onGet(CacheKey<K> key, V obj)
+ {
+ List<ListenerContext> listeners = getListeners(key.getFullName());
+ if (listeners == null || listeners.isEmpty())
+ {
+ return;
+ }
+ for (ListenerContext context : listeners)
+ {
+ try
+ {
+ context.onGet(key, obj);
+ }
+ catch (Exception e)
+ {
+ if (LOG.isWarnEnabled())
+ LOG.warn("Cannot execute the CacheListener properly", e);
+ }
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ void onClearCache()
+ {
+ List<ListenerContext> listeners = getListeners(fullName);
+ if (listeners == null || listeners.isEmpty())
+ {
+ return;
+ }
+ for (ListenerContext context : listeners)
+ {
+ try
+ {
+ context.onClearCache();
+ }
+ catch (Exception e)
+ {
+ if (LOG.isWarnEnabled())
+ LOG.warn("Cannot execute the CacheListener properly", e);
+ }
+ }
+ }
+
+ @Listener
+ public class CacheEventListener
+ {
+ /**
+ * Warning Infinispan triggers a <code>CacheEntryEvictedEvent</code>
only at explicit eviction
+ * that is done lazily which is not exactly what we expect, we still use it to be
+ * able to use it with <code>avoidValueReplication</code> set to
<code>true</code>.
+ */
+ @CacheEntriesEvicted
+ public void cacheEntryEvicted(CacheEntriesEvictedEvent<CacheKey<K>, V>
evt)
+ {
+ if (evt.isPre())
+ {
+ for (Map.Entry<CacheKey<K>, V> entry :
evt.getEntries().entrySet())
+ {
+ onExpire(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ @CacheEntryRemoved
+ public void cacheEntryRemoved(CacheEntryRemovedEvent<CacheKey<K>, V>
evt)
+ {
+ if (evt.isPre() && !evt.isOriginLocal())
+ {
+ final CacheKey<K> key = evt.getKey();
+ final V value = evt.getValue();
+ onRemove(key, value);
+ }
+ }
+
+ @CacheEntryModified
+ public void cacheEntryModified(CacheEntryModifiedEvent<CacheKey<K>, V>
evt)
+ {
+ if (!evt.isOriginLocal() && !evt.isPre())
+ {
+ final CacheKey<K> key = evt.getKey();
+ final V value = evt.getValue();
+ onPut(key, value);
+ }
+ }
+ }
+
+ private static class ListenerContext<K extends Serializable, V> implements
CacheListenerContext, CacheInfo
+ {
+
+ /** . */
+ private final ExoCache<K, V> cache;
+
+ /** . */
+ final CacheListener<? super K, ? super V> listener;
+
+ public ListenerContext(CacheListener<? super K, ? super V> listener,
ExoCache<K, V> cache)
+ {
+ this.listener = listener;
+ this.cache = cache;
+ }
+
+ public CacheInfo getCacheInfo()
+ {
+ return this;
+ }
+
+ public String getName()
+ {
+ return cache.getName();
+ }
+
+ public int getMaxSize()
+ {
+ return cache.getMaxSize();
+ }
+
+ public long getLiveTime()
+ {
+ return cache.getLiveTime();
+ }
+
+ public int getSize()
+ {
+ return cache.getCacheSize();
+ }
+
+ void onExpire(K key, V obj) throws Exception
+ {
+ listener.onExpire(this, key, obj);
+ }
+
+ void onRemove(K key, V obj) throws Exception
+ {
+ listener.onRemove(this, key, obj);
+ }
+
+ void onPut(K key, V obj) throws Exception
+ {
+ listener.onPut(this, key, obj);
+ }
+
+ void onGet(K key, V obj) throws Exception
+ {
+ listener.onGet(this, key, obj);
+ }
+
+ void onClearCache() throws Exception
+ {
+ listener.onClearCache(this);
+ }
+ }
+
+ public void setMaxSize(int max)
+ {
+ throw new UnsupportedOperationException("The configuration of the cache cannot
not be modified");
+ }
+
+ public void setLiveTime(long period)
+ {
+ throw new UnsupportedOperationException("The configuration of the cache cannot
not be modified");
+ }
+
+ @ManagedName("MaxEntries")
+ @ManagedDescription("Maximum number of entries in a cache instance. -1 means no
limit.")
+ public int getMaxSize()
+ {
+ return cache.getConfiguration().getEvictionMaxEntries();
+ }
+
+ @ManagedName("Lifespan")
+ @ManagedDescription("Maximum lifespan of a cache entry, after which the entry is
expired cluster-wide."
+ + " -1 means the entries never expire.")
+ public long getLiveTime()
+ {
+ return cache.getConfiguration().getExpirationLifespan();
+ }
+
+ @Managed
+ @ManagedName("MaxIdle")
+ @ManagedDescription("Maximum idle time a cache entry will be maintained in the
cache. "
+ + "If the idle time is exceeded, the entry will be expired cluster-wide. -1
means the entries never expire.")
+ public long getMaxIdle()
+ {
+ return cache.getConfiguration().getExpirationMaxIdle();
+ }
+
+ @Managed
+ @ManagedName("WakeUpInterval")
+ @ManagedDescription("Interval between subsequent eviction runs. If you wish to
disable the periodic eviction "
+ + "process altogether, set wakeupInterval to -1.")
+ public long getWakeUpInterval()
+ {
+ return cache.getConfiguration().getExpirationWakeUpInterval();
+ }
+
+ public static class CacheKey<K> implements Externalizable
+ {
+ private K key;
+
+ private String fullName;
+
+ public CacheKey() {}
+ public CacheKey(String fullName, K key)
+ {
+ this.fullName = fullName;
+ this.key = key;
+ }
+
+ /**
+ * @return the nested key
+ */
+ K getKey()
+ {
+ return key;
+ }
+
+ /**
+ * @return the fullName
+ */
+ String getFullName()
+ {
+ return fullName;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((fullName == null) ? 0 : fullName.hashCode());
+ result = prime * result + ((key == null) ? 0 : key.hashCode());
+ return result;
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ @SuppressWarnings("rawtypes")
+ CacheKey other = (CacheKey)obj;
+ if (fullName == null)
+ {
+ if (other.fullName != null)
+ return false;
+ }
+ else if (!fullName.equals(other.fullName))
+ return false;
+ if (key == null)
+ {
+ if (other.key != null)
+ return false;
+ }
+ else if (!key.equals(other.key))
+ return false;
+ return true;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "CacheKey [fullName=" + fullName + ", key=" + key +
"]";
+ }
+
+ /**
+ * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ byte[] buf = fullName.getBytes("UTF-8");
+ out.writeInt(buf.length);
+ out.write(buf);
+ out.writeObject(key);
+ }
+
+ /**
+ * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+ */
+ @SuppressWarnings("unchecked")
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException
+ {
+ byte[] buf = new byte[in.readInt()];
+ in.readFully(buf);
+ fullName = new String(buf, "UTF-8");
+ key = (K)in.readObject();
+ }
+ }
+
+ private abstract static class AbstractExoCacheMapper<K, V, KOut, VOut> extends
AbstractMapper<CacheKey<K>, V, KOut, VOut> implements Externalizable
+ {
+ /**
+ * The full name of the cache instance
+ */
+ private String fullName;
+
+ public AbstractExoCacheMapper() {}
+
+ public AbstractExoCacheMapper(String fullName)
+ {
+ this.fullName = fullName;
+ }
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 7962676854308932222L;
+
+
+ /**
+ * @see org.exoplatform.services.ispn.AbstractMapper#isValid(java.lang.Object)
+ */
+ @Override
+ protected boolean isValid(CacheKey<K> key)
+ {
+ return fullName.equals(key.getFullName());
+ }
+
+ /**
+ * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ byte[] buf = fullName.getBytes("UTF-8");
+ out.writeInt(buf.length);
+ out.write(buf);
+ }
+
+ /**
+ * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+ */
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException
+ {
+ byte[] buf = new byte[in.readInt()];
+ in.readFully(buf);
+ fullName = new String(buf, "UTF-8");
+ }
+ }
+
+ public static class GetSizeMapper<K, V> extends AbstractExoCacheMapper<K, V,
String, Integer>
+ {
+
+ public GetSizeMapper() {}
+
+ public GetSizeMapper(String fullName)
+ {
+ super(fullName);
+ }
+
+
+ /**
+ * @see org.exoplatform.services.ispn.AbstractMapper#_map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ @Override
+ protected void _map(CacheKey<K> key, V value, Collector<String,
Integer> collector)
+ {
+ collector.emit("total", Integer.valueOf(1));
+ }
+
+ }
+
+ public static class GetSizeReducer<K> implements Reducer<K, Integer>
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = -5264142863835473112L;
+
+ /**
+ * @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object,
java.util.Iterator)
+ */
+ @Override
+ public Integer reduce(K reducedKey, Iterator<Integer> iter)
+ {
+ int sum = 0;
+ while (iter.hasNext())
+ {
+ Integer i = iter.next();
+ sum += i;
+ }
+ return sum;
+ }
+ }
+
+ public static class GetCachedObjectsMapper<K, V> extends
AbstractExoCacheMapper<K, V, String, List<V>>
+ {
+
+ public GetCachedObjectsMapper() {}
+
+ public GetCachedObjectsMapper(String fullName)
+ {
+ super(fullName);
+ }
+
+
+ /**
+ * @see org.exoplatform.services.ispn.AbstractMapper#_map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ @Override
+ protected void _map(CacheKey<K> key, V value, Collector<String,
List<V>> collector)
+ {
+ collector.emit("values", Collections.singletonList(value));
+ }
+
+ }
+
+ public static class GetCachedObjectsReducer<K, V> implements Reducer<K,
List<V>>
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 8069024420056440405L;
+
+ /**
+ * @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object,
java.util.Iterator)
+ */
+ @Override
+ public List<V> reduce(K reducedKey, Iterator<List<V>> iter)
+ {
+ List<V> values = new ArrayList<V>();
+ while (iter.hasNext())
+ {
+ List<V> vals = iter.next();
+ values.addAll(vals);
+ }
+ return values;
+ }
+ }
+
+ public static class ClearCacheMapper<K, V> extends AbstractExoCacheMapper<K,
V, String, CacheKey<K>>
+ {
+
+ public ClearCacheMapper() {}
+
+ public ClearCacheMapper(String fullName)
+ {
+ super(fullName);
+ }
+
+
+ /**
+ * @see org.exoplatform.services.ispn.AbstractMapper#_map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ @Override
+ protected void _map(CacheKey<K> key, V value, Collector<String,
CacheKey<K>> collector)
+ {
+ collector.emit("keys", key);
+ }
+
+ }
+
+ public static class ClearCacheReducer<K, V, KIn> implements Reducer<K,
CacheKey<KIn>>
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = -8111087186325793256L;
+
+ /**
+ * @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object,
java.util.Iterator)
+ */
+ @Override
+ public CacheKey<KIn> reduce(K reducedKey, Iterator<CacheKey<KIn>>
iter)
+ {
+ CacheKey<KIn> firstKey;
+ if (iter == null || !iter.hasNext() || (firstKey = iter.next()) == null)
+ {
+ return null;
+ }
+ ExoContainer container = ExoContainerContext.getTopContainer();
+ if (container == null)
+ {
+ LOG.error("The top container could not be found");
+ return null;
+ }
+ DistributedCacheManager dcm =
(DistributedCacheManager)container.getComponentInstanceOfType(DistributedCacheManager.class);
+ if (dcm == null)
+ {
+ LOG.error("The DistributedCacheManager could not be found at top
container level, please configure it.");
+ return null;
+ }
+ Cache<CacheKey<K>, V> cache = dcm.getCache(CACHE_NAME);
+ final LockManager lm = cache.getAdvancedCache().getLockManager();
+ // Sort the keys to prevent deadlocks
+ Set<CacheKey<KIn>> keys = new TreeSet<CacheKey<KIn>>(new
Comparator<CacheKey<KIn>>()
+ {
+ public int compare(CacheKey<KIn> o1, CacheKey<KIn> o2)
+ {
+ int result = lm.getLockId(o1) - lm.getLockId(o2);
+ return result == 0 ? System.identityHashCode(o1) -
System.identityHashCode(o2) : result;
+ }
+ });
+ keys.add(firstKey);
+ while (iter.hasNext())
+ {
+ keys.add(iter.next());
+ }
+ for (CacheKey<KIn> key : keys)
+ {
+ cache.getAdvancedCache().withFlags(Flag.SKIP_REMOTE_LOOKUP,
Flag.FAIL_SILENTLY).remove(key);
+ }
+ return null;
+ }
+ }
+
+ public static class GetEntriesMapper<K, V> extends AbstractExoCacheMapper<K,
V, K, V>
+ {
+ public GetEntriesMapper() {}
+
+ public GetEntriesMapper(String fullName)
+ {
+ super(fullName);
+ }
+
+
+ /**
+ * @see org.exoplatform.services.ispn.AbstractMapper#_map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ @Override
+ protected void _map(CacheKey<K> key, V value, Collector<K, V>
collector)
+ {
+ collector.emit(key.getKey(), value);
+ }
+ }
+
+ public static class GetEntriesReducer<K, V> implements Reducer<K, V>
+ {
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 5153826700048219537L;
+
+ /**
+ * @see org.infinispan.distexec.mapreduce.Reducer#reduce(java.lang.Object,
java.util.Iterator)
+ */
+ @Override
+ public V reduce(K reducedKey, Iterator<V> iter)
+ {
+ return iter == null || !iter.hasNext() ? null : iter.next();
+ }
+ }
+}
\ No newline at end of file
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/generic/GenericExoCacheCreator.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/generic/GenericExoCacheCreator.java 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/cache/impl/infinispan/generic/GenericExoCacheCreator.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -152,11 +152,13 @@
cacheConfig.fluent().expiration().lifespan(period);
}
+ @Managed
public void setMaxIdle(long maxIdle)
{
cacheConfig.fluent().expiration().maxIdle(maxIdle);
}
+ @Managed
public void setWakeUpInterval(long wakeUpInterval)
{
cacheConfig.fluent().expiration().wakeUpInterval(wakeUpInterval);
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/AbstractMapper.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/AbstractMapper.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/AbstractMapper.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.ispn;
+
+import org.infinispan.distexec.mapreduce.Collector;
+import org.infinispan.distexec.mapreduce.Mapper;
+
+/**
+ * The main class of all the mappers.
+ *
+ * @author <a href="mailto:nfilotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ *
+ */
+public abstract class AbstractMapper<KIn, VIn, KOut, VOut> implements
Mapper<KIn, VIn, KOut, VOut>
+{
+
+ /**
+ * The serial version UID
+ */
+ private static final long serialVersionUID = 7118530772747505976L;
+
+ /**
+ * @see org.infinispan.distexec.mapreduce.Mapper#map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ @Override
+ public void map(KIn key, VIn value, Collector<KOut, VOut> collector)
+ {
+ if (isValid(key))
+ {
+ _map(key, value, collector);
+ }
+ }
+
+ /**
+ * This method is in fact an internal mapping
+ *
+ * @see org.infinispan.distexec.mapreduce.Mapper#map(java.lang.Object,
java.lang.Object, org.infinispan.distexec.mapreduce.Collector)
+ */
+ protected abstract void _map(KIn key, VIn value, Collector<KOut, VOut>
collector);
+
+ /**
+ * Indicates if the given key matches with the current context, indeed as the cache
instances are
+ * shared it is needed to check each key to know if it is part of the targeted scope
or not.
+ *
+ * @param key the key to check
+ * @return <code>true</code> if the key matches with the scope,
<code>false</code> otherwise.
+ */
+ protected abstract boolean isValid(KIn key);
+
+}
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/DistributedCacheManager.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,185 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.ispn;
+
+import org.exoplatform.commons.utils.SecurityHelper;
+import org.exoplatform.container.ExoContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.util.TemplateConfigurationHelper;
+import org.exoplatform.container.xml.InitParams;
+import org.exoplatform.container.xml.PropertiesParam;
+import org.exoplatform.container.xml.ValueParam;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.infinispan.Cache;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.picocontainer.Startable;
+
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+/**
+ * This class is used to allow to use infinispan in distribution mode with
+ * the ability to launch infinispan instances in standalone mode, in other
+ * words outside an application server. To make it possible we will need to share
+ * the same cache instance whatever the related {@link ExoContainer} because
+ * to be able to launch ispn instances in standalone mode we need to have a static
+ * configuration file.
+ *
+ * @author <a href="mailto:nfilotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ *
+ */
+public class DistributedCacheManager implements Startable
+{
+ /**
+ * The logger
+ */
+ private static final Log LOG = ExoLogger
+
.getLogger("exo.kernel.component.ext.cache.impl.infinispan.v5.DistributedCacheManager");
+
+ /**
+ * The parameter name corresponding to the infinispan configuration
+ */
+ private static final String CONFIG_FILE_PARAMETER_NAME =
"infinispan-configuration";
+
+ /**
+ * The parameter name corresponding to the parameters to inject
+ * into the infinispan configuration file
+ */
+ private static final String PARAMS_PARAMETER_NAME = "parameters";
+
+ /**
+ * The infinispan cache manager
+ */
+ protected final EmbeddedCacheManager manager;
+
+ /**
+ * Default constructor
+ */
+ public DistributedCacheManager(String configurationFile, Map<String, String>
parameters,
+ ConfigurationManager configManager)
+ {
+ this.manager = init(configurationFile, parameters, configManager);
+ }
+
+ /**
+ * Default constructor
+ */
+ public DistributedCacheManager(InitParams params, ConfigurationManager configManager)
+ {
+ ValueParam vp;
+ final String result;
+ if (params != null && (vp =
params.getValueParam(CONFIG_FILE_PARAMETER_NAME)) != null
+ && (result = vp.getValue()) != null && !result.isEmpty())
+ {
+ PropertiesParam pp = params.getPropertiesParam(PARAMS_PARAMETER_NAME);
+ this.manager = init(result, pp == null ? null : pp.getProperties(),
configManager);
+ }
+ else
+ {
+ throw new IllegalArgumentException("The parameter '" +
CONFIG_FILE_PARAMETER_NAME + "' must be set");
+ }
+ }
+
+ /**
+ * Initializes and created the CacheManager
+ * @param configurationFile the path of the configuration file
+ * @param parameters the parameters to inject into the configuration file
+ * @param configManager the configuration manager used to get the configuration file
+ * @return the CacheManager initialized
+ */
+ private EmbeddedCacheManager init(final String configurationFile, final Map<String,
String> parameters,
+ final ConfigurationManager configManager)
+ {
+ try
+ {
+ if (configurationFile == null || configurationFile.isEmpty())
+ {
+ throw new IllegalArgumentException("The parameter
'configurationFile' must be set");
+ }
+ if (LOG.isDebugEnabled())
+ {
+ LOG.debug("The configuration file of the DistributedCacheManager will be
loaded from " + configurationFile);
+ }
+ final TemplateConfigurationHelper helper =
+ new TemplateConfigurationHelper(new
String[]{"^jgroups-configuration", "^infinispan-.*"},
+ new String[]{"^infinispan-configuration"}, configManager);
+ if (LOG.isDebugEnabled() && parameters != null &&
!parameters.isEmpty())
+ {
+ LOG.debug("The parameters to use while processing the configuration file
are " + parameters);
+ }
+ return SecurityHelper.doPrivilegedIOExceptionAction(new
PrivilegedExceptionAction<EmbeddedCacheManager>()
+ {
+
+ @Override
+ public EmbeddedCacheManager run() throws Exception
+ {
+ EmbeddedCacheManager manager =
+ new DefaultCacheManager(helper.fillTemplate(configurationFile,
parameters), false);
+ Utils.loadJGroupsConfig(configManager, manager.getGlobalConfiguration());
+ manager.start();
+ for (String cacheName : manager.getCacheNames())
+ {
+ manager.getCache(cacheName);
+ }
+ return manager;
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Could not initialize the cache manager
corresponding to the configuration file "
+ + configurationFile, e);
+ }
+ }
+
+ /**
+ * Gives the cache corresponding to the given name if it doesn't exist
+ * a {@link NullPointerException} will be thrown
+ */
+ public <K, V> Cache<K, V> getCache(String cacheName)
+ {
+ Cache<K, V> cache = manager.getCache(cacheName, false);
+ if (cache == null)
+ {
+ throw new NullPointerException("The expected cache named '" +
cacheName
+ + "' has not been defined in the configuration of infinispan as
named cache.");
+ }
+ return cache;
+ }
+
+ /**
+ * @see org.picocontainer.Startable#start()
+ */
+ @Override
+ public void start()
+ {
+ }
+
+ /**
+ * @see org.picocontainer.Startable#stop()
+ */
+ @Override
+ public void stop()
+ {
+ manager.stop();
+ }
+}
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/Utils.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/Utils.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/java/org/exoplatform/services/ispn/Utils.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,120 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.ispn;
+
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.container.util.TemplateConfigurationHelper;
+import org.exoplatform.services.cache.ExoCacheInitException;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+/**
+ * @author <a href="mailto:nfilotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ *
+ */
+public class Utils
+{
+
+ /**
+ * The logger
+ */
+ private static final Log LOG = ExoLogger
+ .getLogger("exo.kernel.component.ext.cache.impl.infinispan.v5.Utils");
+
+ private Utils() {}
+
+
+ /**
+ * Load the JGroups configuration file thanks to the {@link ConfigurationManager}
+ * @param config the global configuration from which the JGroups config will be
extracted
+ * @return <code>true</code> if the JGoups config could be loaded
successfully,
+ * <code>false</code> if there were no JGroups config to load
+ * @throws IllegalStateException if the JGroups config could not be loaded
+ */
+ public static boolean loadJGroupsConfig(ConfigurationManager cfm, GlobalConfiguration
config) throws ExoCacheInitException
+ {
+ Properties properties = config.getTransportProperties();
+ if (properties == null ||
!properties.containsKey(JGroupsTransport.CONFIGURATION_FILE))
+ {
+ return false;
+ }
+ String filename = properties.getProperty(JGroupsTransport.CONFIGURATION_FILE);
+ InputStream inputStream = TemplateConfigurationHelper.getInputStream(cfm,
filename);
+
+ // inputStream still remains null, so file was not opened
+ if (inputStream == null)
+ {
+ throw new IllegalStateException("The jgroups configuration cannot be loaded
from '" + filename
+ + "'");
+ }
+ try
+ {
+ // Set the jgroups configuration as XML
+ properties.setProperty(JGroupsTransport.CONFIGURATION_XML,
readStream(inputStream));
+ }
+ catch (IOException e)
+ {
+ throw new IllegalStateException("The jgroups configuration cannot be read
from '" + filename
+ + "'");
+ }
+ // Remove the property corresponding to the configuration file
+ properties.remove(JGroupsTransport.CONFIGURATION_FILE);
+ return true;
+ }
+
+
+ /**
+ * Reads bytes from input stream and builds a string from them
+ *
+ * @param inputStream
+ * @return
+ * @throws IOException
+ */
+ private static String readStream(InputStream inputStream) throws IOException
+ {
+ StringBuilder out = new StringBuilder(4096);
+ byte[] b = new byte[4096];
+ try
+ {
+ for (int length; (length = inputStream.read(b)) != -1;)
+ {
+ out.append(new String(b, 0, length));
+ }
+ }
+ finally
+ {
+ try
+ {
+ inputStream.close();
+ }
+ catch (Exception e)
+ {
+ LOG.debug("Cannot close stream", e);
+ }
+ }
+ return out.toString();
+ }
+}
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/main/resources/conf/portal/cache-configuration-template.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -19,8 +19,8 @@
02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-->
-<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
- xmlns="urn:infinispan:config:5.0">
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
+ xmlns="urn:infinispan:config:5.1">
<global>
<asyncListenerExecutor
factory="org.infinispan.executors.DefaultExecutorFactory">
<properties>
@@ -47,14 +47,14 @@
<globalJmxStatistics jmxDomain="exo" enabled="true"
allowDuplicateDomains="true"/>
<transport
transportClass="org.infinispan.remoting.transport.jgroups.JGroupsTransport"
clusterName="eXoCache-cluster" distributedSyncTimeout="20000">
<properties>
- <property name="configurationFile"
value="flush-udp.xml"/>
+ <property name="configurationFile" value="udp.xml"/>
</properties>
</transport>
<shutdown hookBehavior="DEFAULT"/>
</global>
<default>
<locking isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="10000" writeSkewCheck="false"
concurrencyLevel="500"/>
- <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="false" syncCommitPhase="false"/>
+ <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="true" syncCommitPhase="true"/>
<jmxStatistics enabled="true"/>
<invocationBatching enabled="true"/>
<clustering mode="replication">
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/TestAbstractExoCache.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/TestAbstractExoCache.java 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/TestAbstractExoCache.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -549,13 +549,13 @@
config.setMaxSize(5);
config.setLiveTime(1);
config.setImplementation("LRU");
- config.setDistributed(true);
+ config.setReplicated(true);
ExoCacheConfig config2 = new ExoCacheConfig();
config2.setName("MyCacheDistributed2");
config2.setMaxSize(5);
config2.setLiveTime(1);
config2.setImplementation("LRU");
- config2.setDistributed(true);
+ config2.setReplicated(true);
AbstractExoCache<Serializable, Object> cache1 =
(AbstractExoCache<Serializable,
Object>)getExoCacheFactoryInstance().createCache(config);
MyCacheListener listener1 = new MyCacheListener();
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/java/org/exoplatform/services/cache/impl/infinispan/distributed/TestDistributedExoCache.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,701 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.cache.impl.infinispan.distributed;
+
+import org.exoplatform.container.ExoContainerContext;
+import org.exoplatform.container.PortalContainer;
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.services.cache.CacheListener;
+import org.exoplatform.services.cache.CacheListenerContext;
+import org.exoplatform.services.cache.CacheService;
+import org.exoplatform.services.cache.CachedObjectSelector;
+import org.exoplatform.services.cache.ExoCache;
+import org.exoplatform.services.cache.ExoCacheConfig;
+import org.exoplatform.services.cache.ExoCacheFactory;
+import org.exoplatform.services.cache.ObjectCacheInfo;
+import org.exoplatform.services.cache.impl.infinispan.ExoCacheFactoryImpl;
+import org.exoplatform.services.ispn.DistributedCacheManager;
+import org.exoplatform.test.BasicTestCase;
+import org.infinispan.distribution.DistributionManager;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ *
+ */
+public class TestDistributedExoCache extends BasicTestCase
+{
+
+ CacheService service;
+
+ DistributedExoCache<Serializable, Object> cache;
+
+ DistributedExoCache<Serializable, Object> cache2;
+
+ public TestDistributedExoCache(String name)
+ {
+ super(name);
+ }
+
+ public void setUp() throws Exception
+ {
+ this.service =
(CacheService)PortalContainer.getInstance().getComponentInstanceOfType(CacheService.class);
+ this.cache = (DistributedExoCache<Serializable,
Object>)service.getCacheInstance("cache-distributed");
+ this.cache2 = (DistributedExoCache<Serializable,
Object>)service.getCacheInstance("cache-distributed2");
+ cache2.put(new MyKey("a"), "a");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ cache.clearCache();
+ cache2.clearCache();
+ }
+
+ public void testPut() throws Exception
+ {
+ cache.put(new MyKey("a"), "a");
+ cache.put(new MyKey("b"), "b");
+ cache.put(new MyKey("c"), "c");
+ assertEquals(3, cache.getCacheSize());
+ cache.put(new MyKey("a"), "c");
+ assertEquals(3, cache.getCacheSize());
+ cache.put(new MyKey("d"), "c");
+ assertEquals(4, cache.getCacheSize());
+ }
+
+ public void testClearCache() throws Exception
+ {
+ cache.put(new MyKey("a"), "a");
+ cache.put(new MyKey("b"), "b");
+ cache.put(new MyKey("c"), "c");
+ assertTrue(cache.getCacheSize() > 0);
+ cache.clearCache();
+ assertTrue(cache.getCacheSize() == 0);
+ }
+
+ public void testGet() throws Exception
+ {
+ cache.put(new MyKey("a"), "a");
+ assertEquals("a", cache.get(new MyKey("a")));
+ cache.put(new MyKey("a"), "c");
+ assertEquals("c", cache.get(new MyKey("a")));
+ cache.remove(new MyKey("a"));
+ assertEquals(null, cache.get(new MyKey("a")));
+ assertEquals(null, cache.get(new MyKey("x")));
+ }
+
+ public void testRemove() throws Exception
+ {
+ cache.put(new MyKey("a"), 1);
+ cache.put(new MyKey("b"), 2);
+ cache.put(new MyKey("c"), 3);
+ assertEquals(3, cache.getCacheSize());
+ assertEquals(1, cache.remove(new MyKey("a")));
+ assertEquals(2, cache.getCacheSize());
+ assertEquals(2, cache.remove(new MyKey("b")));
+ assertEquals(1, cache.getCacheSize());
+ assertEquals(null, cache.remove(new MyKey("x")));
+ assertEquals(1, cache.getCacheSize());
+ }
+
+ public void testPutMap() throws Exception
+ {
+ Map<Serializable, Object> values = new HashMap<Serializable,
Object>();
+ values.put(new MyKey("a"), "a");
+ values.put(new MyKey("b"), "b");
+ assertEquals(0, cache.getCacheSize());
+ cache.putMap(values);
+ assertEquals(2, cache.getCacheSize());
+ values = new HashMap<Serializable, Object>()
+ {
+ private static final long serialVersionUID = 1L;
+
+ public Set<Entry<Serializable, Object>> entrySet()
+ {
+ Set<Entry<Serializable, Object>> set = new
LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
+ set.add(new Entry<Serializable, Object>()
+ {
+
+ public Object setValue(Object paramV)
+ {
+ return null;
+ }
+
+ public Object getValue()
+ {
+ throw new RuntimeException("An exception");
+ }
+
+ public Serializable getKey()
+ {
+ return "c";
+ }
+ });
+ return set;
+ }
+ };
+ values.put(new MyKey("e"), "e");
+ values.put(new MyKey("d"), "d");
+ cache.putMap(values);
+ assertEquals(2, cache.getCacheSize());
+ }
+
+ public void testGetCachedObjects() throws Exception
+ {
+ cache.put(new MyKey("a"), "a");
+ cache.put(new MyKey("b"), "b");
+ cache.put(new MyKey("c"), "c");
+ cache.put(new MyKey("d"), null);
+ assertEquals(3, cache.getCacheSize());
+ List<Object> values = cache.getCachedObjects();
+ assertEquals(3, values.size());
+ assertTrue(values.contains("a"));
+ assertTrue(values.contains("b"));
+ assertTrue(values.contains("c"));
+ }
+
+ public void testRemoveCachedObjects() throws Exception
+ {
+ cache.put(new MyKey("a"), "a");
+ cache.put(new MyKey("b"), "b");
+ cache.put(new MyKey("c"), "c");
+ cache.put(new MyKey("d"), null);
+ assertEquals(3, cache.getCacheSize());
+ List<Object> values = cache.removeCachedObjects();
+ assertEquals(3, values.size());
+ assertTrue(values.contains("a"));
+ assertTrue(values.contains("b"));
+ assertTrue(values.contains("c"));
+ assertEquals(0, cache.getCacheSize());
+ }
+
+ public void testSelect() throws Exception
+ {
+ cache.put(new MyKey("a"), 1);
+ cache.put(new MyKey("b"), 2);
+ cache.put(new MyKey("c"), 3);
+ final AtomicInteger count = new AtomicInteger();
+ CachedObjectSelector<Serializable, Object> selector = new
CachedObjectSelector<Serializable, Object>()
+ {
+
+ public void onSelect(ExoCache<? extends Serializable, ? extends Object>
cache, Serializable key,
+ ObjectCacheInfo<? extends Object> ocinfo) throws Exception
+ {
+ assertTrue(key.equals(new MyKey("a")) || key.equals(new
MyKey("b")) || key.equals(new MyKey("c")));
+ assertTrue(ocinfo.get().equals(1) || ocinfo.get().equals(2) ||
ocinfo.get().equals(3));
+ count.incrementAndGet();
+ }
+
+ public boolean select(Serializable key, ObjectCacheInfo<? extends Object>
ocinfo)
+ {
+ return true;
+ }
+ };
+ cache.select(selector);
+ assertEquals(3, count.intValue());
+ }
+
+ public void testGetHitsNMisses() throws Exception
+ {
+ int hits = cache.getCacheHit();
+ int misses = cache.getCacheMiss();
+ cache.put(new MyKey("a"), "a");
+ cache.get(new MyKey("a"));
+ cache.remove(new MyKey("a"));
+ cache.get(new MyKey("a"));
+ cache.get(new MyKey("z"));
+ assertEquals(1, cache.getCacheHit() - hits);
+ assertEquals(2, cache.getCacheMiss() - misses);
+ }
+
+ public void testMultiThreading() throws Exception
+ {
+ long time = System.currentTimeMillis();
+ final int totalElement = 100;
+ final int totalTimes = 20;
+ int reader = 20;
+ int writer = 10;
+ int remover = 5;
+ int cleaner = 1;
+ final CountDownLatch startSignalWriter = new CountDownLatch(1);
+ final CountDownLatch startSignalOthers = new CountDownLatch(1);
+ final CountDownLatch doneSignal = new CountDownLatch(reader + writer + remover);
+ final List<Exception> errors = Collections.synchronizedList(new
ArrayList<Exception>());
+ for (int i = 0; i < writer; i++)
+ {
+ final int index = i;
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignalWriter.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.put(new MyKey("key" + i), "value" +
i);
+ }
+ if (index == 0 && j == 0)
+ {
+ // The cache is full, we can launch the others
+ startSignalOthers.countDown();
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ startSignalWriter.countDown();
+ for (int i = 0; i < reader; i++)
+ {
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignalOthers.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.get(new MyKey("key" + i));
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ for (int i = 0; i < remover; i++)
+ {
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignalOthers.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.remove(new MyKey("key" + i));
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ doneSignal.await();
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.put(new MyKey("key" + i), "value" + i);
+ }
+ assertEquals(totalElement, cache.getCacheSize());
+ final CountDownLatch startSignal = new CountDownLatch(1);
+ final CountDownLatch doneSignal2 = new CountDownLatch(writer + cleaner);
+ for (int i = 0; i < writer; i++)
+ {
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignal.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ for (int i = 0; i < totalElement; i++)
+ {
+ cache.put(new MyKey("key" + i), "value" +
i);
+ }
+ sleep(50);
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal2.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ for (int i = 0; i < cleaner; i++)
+ {
+ Thread thread = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ startSignal.await();
+ for (int j = 0; j < totalTimes; j++)
+ {
+ sleep(150);
+ cache.clearCache();
+ }
+ }
+ catch (Exception e)
+ {
+ errors.add(e);
+ }
+ finally
+ {
+ doneSignal2.countDown();
+ }
+ }
+ };
+ thread.start();
+ }
+ cache.clearCache();
+ assertEquals(0, cache.getCacheSize());
+ if (!errors.isEmpty())
+ {
+ for (Exception e : errors)
+ {
+ e.printStackTrace();
+ }
+ throw errors.get(0);
+ }
+ System.out.println("Total Time = " + (System.currentTimeMillis() -
time));
+ }
+
+ public static class MyCacheListener implements CacheListener<Serializable,
Object>
+ {
+
+ public int clearCache;
+
+ public int expire;
+
+ public int get;
+
+ public int put;
+
+ public int remove;
+
+ public void onClearCache(CacheListenerContext context) throws Exception
+ {
+ clearCache++;
+ }
+
+ public void onExpire(CacheListenerContext context, Serializable key, Object obj)
throws Exception
+ {
+ expire++;
+ }
+
+ public void onGet(CacheListenerContext context, Serializable key, Object obj)
throws Exception
+ {
+ get++;
+ }
+
+ public void onPut(CacheListenerContext context, Serializable key, Object obj)
throws Exception
+ {
+ put++;
+ }
+
+ public void onRemove(CacheListenerContext context, Serializable key, Object obj)
throws Exception
+ {
+ remove++;
+ }
+ }
+
+ public static class MyKey implements Serializable
+ {
+ private static final long serialVersionUID = 1L;
+
+ public String value;
+ public MyKey(){}
+ public MyKey(String value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object paramObject)
+ {
+ return paramObject instanceof MyKey &&
((MyKey)paramObject).value.endsWith(value);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return value.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return value;
+ }
+ }
+
+ public void testDistributedCache() throws Exception
+ {
+ PortalContainer pc = PortalContainer.getInstance();
+ System.out
+ .println("WARNING: For Linux distributions the following JVM parameter must
be set to true, java.net.preferIPv4Stack = "
+ + System.getProperty("java.net.preferIPv4Stack"));
+ ExoCacheConfig config = new ExoCacheConfig();
+ config.setName("MyCacheDistributed");
+ config.setMaxSize(5);
+ config.setLiveTime(1);
+ config.setImplementation("LRU");
+ config.setDistributed(true);
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("infinispan-num-owners", "1");
+ ConfigurationManager cm =
(ConfigurationManager)pc.getComponentInstanceOfType(ConfigurationManager.class);
+ DistributedCacheManager dcm2 =
+ new
DistributedCacheManager("jar:/conf/portal/distributed-cache-configuration.xml",
params, cm);
+
+ @SuppressWarnings("unchecked")
+ DistributedExoCache<Serializable, Object> cache1 =
+ (DistributedExoCache<Serializable, Object>)((ExoCacheFactory)pc
+ .getComponentInstanceOfType(ExoCacheFactory.class)).createCache(config);
+ DistributionManager dm = cache1.getCache().getDistributionManager();
+ MyCacheListener listener1 = new MyCacheListener();
+ cache1.addCacheListener(listener1);
+ DistributedExoCache<Serializable, Object> cache2 =
+ (DistributedExoCache<Serializable, Object>)new ExoCacheFactoryImpl(
+
(ExoContainerContext)pc.getComponentInstanceOfType(ExoContainerContext.class),
+ "jar:/conf/portal/cache-configuration-template.xml", cm,
dcm2).createCache(config);
+ MyCacheListener listener2 = new MyCacheListener();
+ cache2.addCacheListener(listener2);
+ try
+ {
+ MyKey key;
+ cache1.put(key = new MyKey("a"), "b");
+ assertEquals(1, cache1.getCacheSize());
+ assertEquals("b", cache2.get(new MyKey("a")));
+ assertEquals(1, cache2.getCacheSize());
+
+// int put1 = 1;
+// int put2 = dm.getLocality(key).isLocal() ? 0 : 1;
+//
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(0, listener1.get);
+ assertEquals(1, listener2.get);
+
+ MyKey key2;
+ cache2.put(key2 = new MyKey("b"), "c");
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+ assertEquals("c", cache1.get(new MyKey("b")));
+
+// put1 += dm.getLocality(key2).isLocal() ? 1 : 0;
+// put2++;
+//
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(1, listener1.get);
+ assertEquals(1, listener2.get);
+
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(1, listener1.get);
+ assertEquals(1, listener2.get);
+
+ cache2.put(key = new MyKey("a"), "a");
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+ assertEquals("a", cache1.get(new MyKey("a")));
+
+// put1 += dm.getLocality(key).isLocal() ? 1 : 0;
+// put2++;
+//
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(2, listener1.get);
+ assertEquals(1, listener2.get);
+
+ cache2.remove(key = new MyKey("a"));
+ assertEquals(1, cache1.getCacheSize());
+ assertEquals(1, cache2.getCacheSize());
+
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(2, listener1.get);
+ assertEquals(1, listener2.get);
+
+// int remove1 = dm.getLocality(key).isLocal() ? 1 : 0;
+// int remove2 = 1;
+//
+// assertEquals(remove1, listener1.remove);
+// assertEquals(remove2, listener2.remove);
+
+ cache1.put(key = new MyKey("c"), "c");
+ cache1.clearCache();
+ assertEquals(0, cache1.getCacheSize());
+ assertNull(cache1.get(new MyKey("b")));
+ assertNull(cache2.get(new MyKey("b")));
+ assertNull(cache2.get(new MyKey("c")));
+ assertEquals(0, cache2.getCacheSize());
+
+// put1++;
+// put2 += dm.getLocality(key).isLocal() ? 0 : 1;
+
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(3, listener1.get);
+ assertEquals(3, listener2.get);
+
+// assertEquals(remove1, listener1.remove);
+// assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ Map<Serializable, Object> values = new HashMap<Serializable,
Object>();
+ values.put(key = new MyKey("a"), "a");
+ values.put(key2 = new MyKey("b"), "b");
+ cache1.putMap(values);
+ assertEquals(2, cache1.getCacheSize());
+ Thread.sleep(40);
+ assertEquals("a", cache2.get(new MyKey("a")));
+ assertEquals("b", cache2.get(new MyKey("b")));
+ assertEquals(2, cache2.getCacheSize());
+
+// put1 += 2;
+// put2 += (dm.getLocality(key).isLocal() ? 0 : 1) +
(dm.getLocality(key2).isLocal() ? 0 : 1);
+//
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(3, listener1.get);
+ assertEquals(5, listener2.get);
+
+// assertEquals(remove1, listener1.remove);
+// assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ values = new HashMap<Serializable, Object>()
+ {
+ private static final long serialVersionUID = 1L;
+
+ public Set<Entry<Serializable, Object>> entrySet()
+ {
+ Set<Entry<Serializable, Object>> set = new
LinkedHashSet<Entry<Serializable, Object>>(super.entrySet());
+ set.add(new Entry<Serializable, Object>()
+ {
+
+ public Object setValue(Object paramV)
+ {
+ return null;
+ }
+
+ public Object getValue()
+ {
+ throw new RuntimeException("An exception");
+ }
+
+ public Serializable getKey()
+ {
+ return "c";
+ }
+ });
+ return set;
+ }
+ };
+ values.put(new MyKey("e"), "e");
+ values.put(new MyKey("d"), "d");
+ cache1.putMap(values);
+ assertEquals(2, cache1.getCacheSize());
+ assertEquals(2, cache2.getCacheSize());
+
+// assertEquals(put1, listener1.put);
+// assertEquals(put2, listener2.put);
+
+ assertEquals(3, listener1.get);
+ assertEquals(5, listener2.get);
+
+// assertEquals(remove1, listener1.remove);
+// assertEquals(remove2, listener2.remove);
+
+ assertEquals(1, listener1.clearCache);
+ assertEquals(0, listener2.clearCache);
+
+ assertEquals(0, listener1.expire);
+ assertEquals(0, listener2.expire);
+
+ }
+ finally
+ {
+ dcm2.stop();
+ }
+ }
+}
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/configuration.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/configuration.xml
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/configuration.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+
+ Copyright (C) 2009 eXo Platform SAS.
+
+ This is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of
+ the License, or (at your option) any later version.
+
+ This software is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this software; if not, write to the Free
+ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+
+-->
+<configuration
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.exoplatform.org/xml/ns/kernel_1_2.xsd
http://www.exoplatform.org/xml/ns/kernel_1_2.xsd"
+
xmlns="http://www.exoplatform.org/xml/ns/kernel_1_2.xsd">
+ <component>
+ <type>org.exoplatform.services.ispn.DistributedCacheManager</type>
+ <init-params>
+ <value-param>
+ <name>infinispan-configuration</name>
+ <value>jar:/conf/portal/distributed-cache-configuration.xml</value>
+ </value-param>
+ <properties-param>
+ <name>parameters</name>
+ <description>The parameters of the configuration</description>
+ <property name="infinispan-num-owners"
value="1"></property>
+ </properties-param>
+ </init-params>
+ </component>
+</configuration>
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/cache-configuration-template.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -19,8 +19,8 @@
02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-->
-<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
- xmlns="urn:infinispan:config:5.0">
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
+ xmlns="urn:infinispan:config:5.1">
<global>
<asyncListenerExecutor
factory="org.infinispan.executors.DefaultExecutorFactory">
<properties>
@@ -47,14 +47,14 @@
<globalJmxStatistics jmxDomain="exo" enabled="true"
allowDuplicateDomains="true" />
<transport
transportClass="org.infinispan.remoting.transport.jgroups.JGroupsTransport"
clusterName="Infinispan-cluster" distributedSyncTimeout="20000">
<properties>
- <property name="configurationFile"
value="flush-udp.xml"/>
+ <property name="configurationFile" value="udp.xml"/>
</properties>
</transport>
<shutdown hookBehavior="DEFAULT"/>
</global>
<default>
<locking isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="10000" writeSkewCheck="false"
concurrencyLevel="500"/>
- <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="false" syncCommitPhase="false"/>
+ <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="true" syncCommitPhase="true"/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval timeout="20000"
fetchInMemoryState="false"/>
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration-template.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -19,8 +19,8 @@
02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-->
-<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
- xmlns="urn:infinispan:config:5.0">
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
+ xmlns="urn:infinispan:config:5.1">
<global>
<asyncListenerExecutor
factory="org.infinispan.executors.DefaultExecutorFactory">
<properties>
@@ -47,14 +47,14 @@
<globalJmxStatistics jmxDomain="exo" enabled="true"
allowDuplicateDomains="true"/>
<transport
transportClass="org.infinispan.remoting.transport.jgroups.JGroupsTransport"
clusterName="Infinispan-cluster" distributedSyncTimeout="20000">
<properties>
- <property name="configurationFile"
value="flush-udp.xml"/>
+ <property name="configurationFile" value="udp.xml"/>
</properties>
</transport>
<shutdown hookBehavior="DEFAULT"/>
</global>
<default>
- <locking isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="20000" writeSkewCheck="false"
concurrencyLevel="500"/>
- <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="false" syncCommitPhase="false"/>
+ <locking isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="20000" writeSkewCheck="false"
concurrencyLevel="500" useLockStriping="true"/>
+ <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="true" syncCommitPhase="true"/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval timeout="20000"
fetchInMemoryState="false"/>
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/distributed-cache-configuration.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Copyright (C) 2010 eXo Platform SAS.
+
+ This is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as
+ published by the Free Software Foundation; either version 2.1 of
+ the License, or (at your option) any later version.
+
+ This software is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this software; if not, write to the Free
+ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+
+-->
+<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.1
http://www.infinispan.org/schemas/infinispan-config-5.1.xsd"
+ xmlns="urn:infinispan:config:5.1">
+ <global>
+ <asyncListenerExecutor
factory="org.infinispan.executors.DefaultExecutorFactory">
+ <properties>
+ <property name="maxThreads" value="1"/>
+ <property name="queueSize" value="100000"/>
+ <property name="threadNamePrefix"
value="AsyncListenerThread"/>
+ </properties>
+ </asyncListenerExecutor>
+ <asyncTransportExecutor
factory="org.infinispan.executors.DefaultExecutorFactory">
+ <properties>
+ <property name="threadNamePrefix"
value="AsyncSerializationThread"/>
+ </properties>
+ </asyncTransportExecutor>
+ <evictionScheduledExecutor
factory="org.infinispan.executors.DefaultScheduledExecutorFactory">
+ <properties>
+ <property name="threadNamePrefix"
value="EvictionThread"/>
+ </properties>
+ </evictionScheduledExecutor>
+ <replicationQueueScheduledExecutor
factory="org.infinispan.executors.DefaultScheduledExecutorFactory">
+ <properties>
+ <property name="threadNamePrefix"
value="ReplicationQueueThread"/>
+ </properties>
+ </replicationQueueScheduledExecutor>
+ <globalJmxStatistics jmxDomain="exo" enabled="true"
allowDuplicateDomains="true"/>
+ <transport
transportClass="org.infinispan.remoting.transport.jgroups.JGroupsTransport"
clusterName="Infinispan-cluster" distributedSyncTimeout="20000">
+ <properties>
+ <property name="configurationFile" value="udp.xml"/>
+ </properties>
+ </transport>
+ <shutdown hookBehavior="DEFAULT"/>
+ </global>
+ <namedCache name="eXoCache">
+ <locking isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="20000" writeSkewCheck="false"
concurrencyLevel="500" useLockStriping="true" />
+ <transaction
transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
syncRollbackPhase="true" syncCommitPhase="true"
eagerLockSingleNode="true"/>
+ <jmxStatistics enabled="true"/>
+ <clustering mode="distribution">
+ <l1 enabled="false"/>
+ <hash numOwners="${infinispan-num-owners}"
rehashRpcTimeout="120000" />
+ <sync/>
+ </clustering>
+ <invocationBatching enabled="true"/>
+ </namedCache>
+</infinispan>
\ No newline at end of file
Modified:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/test-configuration.xml
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/test-configuration.xml 2011-12-15
15:26:48 UTC (rev 5330)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/conf/portal/test-configuration.xml 2011-12-15
16:30:03 UTC (rev 5331)
@@ -37,6 +37,25 @@
</object>
</object-param>
<object-param>
+ <name>cache-distributed</name>
+ <description>The default cache configuration</description>
+ <object type="org.exoplatform.services.cache.ExoCacheConfig">
+ <field
name="name"><string>cache-distributed</string></field>
+ <field name="maxSize"><int>5</int></field>
+ <field
name="liveTime"><long>2</long></field>
+ <field
name="distributed"><boolean>true</boolean></field>
+ </object>
+ </object-param>
+ <object-param>
+ <name>cache-distributed2</name>
+ <description>The default cache configuration</description>
+ <object type="org.exoplatform.services.cache.ExoCacheConfig">
+ <field
name="name"><string>cache-distributed2</string></field>
+ <field name="maxSize"><int>5</int></field>
+ <field
name="liveTime"><long>2</long></field>
+ <field
name="distributed"><boolean>true</boolean></field>
+ </object>
+ </object-param> <object-param>
<name>test-multi-threading</name>
<description>The default cache configuration</description>
<object
type="org.exoplatform.services.cache.impl.infinispan.generic.GenericExoCacheConfig">
@@ -53,7 +72,7 @@
<field
name="name"><string>cacheDistributed</string></field>
<field name="maxSize"><int>5</int></field>
<field
name="liveTime"><long>2</long></field>
- <field
name="distributed"><boolean>true</boolean></field>
+ <field
name="replicated"><boolean>true</boolean></field>
</object>
</object-param>
<object-param>
@@ -148,7 +167,6 @@
<!-- </object-param> -->
</init-params>
</component>
-
<component>
<key>org.exoplatform.services.cache.ExoCacheFactory</key>
<type>org.exoplatform.services.cache.impl.infinispan.ExoCacheFactoryImpl</type>
@@ -158,7 +176,7 @@
<value>jar:/conf/portal/cache-configuration-template.xml</value>
</value-param>
</init-params>
- </component>
+ </component>
<external-component-plugins>
<target-component>org.exoplatform.services.cache.ExoCacheFactory</target-component>
@@ -205,6 +223,9 @@
<string>LRU</string>
</value>
<value>
+ <string>LRU_OLD</string>
+ </value>
+ <value>
<string>UNORDERED</string>
</value>
<value>
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/test.policy
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/test.policy
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/test.policy 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,26 @@
+grant codeBase "@MAVEN_REPO@-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@TEST_CLASSES@-"{
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.commons.test/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.commons/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.container/-"{
+ permission java.security.AllPermission;
+};
+
+grant codeBase "@MAIN_CLASSES@../../../exo.kernel.component.cache/-"{
+ permission java.security.AllPermission;
+};
Added:
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/tsm-excludes.properties
===================================================================
---
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/tsm-excludes.properties
(rev 0)
+++
kernel/trunk/exo.kernel.component.ext.cache.impl.infinispan.v5/src/test/resources/tsm-excludes.properties 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1 @@
+org.exoplatform.services.cache.impl.infinispan.distributed.TestDistributedExoCache.testDistributedCache=stop
\ No newline at end of file
Copied:
kernel/trunk/exo.kernel.container/src/main/java/org/exoplatform/container/util/TemplateConfigurationHelper.java
(from rev 5271,
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/config/TemplateConfigurationHelper.java)
===================================================================
---
kernel/trunk/exo.kernel.container/src/main/java/org/exoplatform/container/util/TemplateConfigurationHelper.java
(rev 0)
+++
kernel/trunk/exo.kernel.container/src/main/java/org/exoplatform/container/util/TemplateConfigurationHelper.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,249 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.container.util;
+
+import org.exoplatform.commons.utils.PrivilegedFileHelper;
+import org.exoplatform.container.configuration.ConfigurationManager;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+/**
+ * Builds configuration from template using map of template-variables <--> value.
+ * Class provides extra functionality for filtering parameters by pattern, excluding
+ * unnecessary parameters.
+ *
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: TemplateConfigurationHelper.java 34360 2009-07-22 23:58:59Z nzamosenchuk
$
+ *
+ */
+public class TemplateConfigurationHelper
+{
+ // list with include-patterns
+ private List<Pattern> includes = new ArrayList<Pattern>();
+
+ // list with exclude-patterns
+ private List<Pattern> excludes = new ArrayList<Pattern>();
+
+ private ConfigurationManager cfm;
+
+ /**
+ * Creates instance of template configuration helper with given lists of filtering
+ * patterns. Parameter will be included only if it matches any include-pattern and
+ * doesn't match any exclude-pattern. I.e. You can include "extended-*"
and exclude
+ * "extended-type". Please refer to Java regexp documentation. Filtering for
this
+ * example, should be defined as following:
+ * include: "^extended-.*"
+ * exclude: "^extended-type"
+ *
+ * @param includes Array with string representation of include reg-exp patterns
+ * @param excludes Array with string representation of exclude reg-exp patterns
+ * @param ConfigurationManager instance for looking up resources
+ */
+ public TemplateConfigurationHelper(String[] includes, String[] excludes,
ConfigurationManager cfm)
+ {
+ super();
+ this.cfm = cfm;
+ // compile include patterns
+ for (String regex : includes)
+ {
+ this.includes.add(Pattern.compile(regex));
+ }
+ // compile exclude patterns
+ for (String regex : excludes)
+ {
+ this.excludes.add(Pattern.compile(regex));
+ }
+ }
+
+ /**
+ * Reads configuration file from a stream and replaces all the occurrences of
template-variables
+ * (like : "${parameter.name}") with values provided in the map.
+ *
+ * @param inputStream
+ * @param parameters
+ * @return
+ * @throws IOException
+ */
+ public InputStream fillTemplate(InputStream inputStream, Map<String, String>
parameters) throws IOException
+ {
+ if (inputStream == null || parameters == null || parameters.size() == 0)
+ {
+ return inputStream;
+ }
+ // parameters filtering
+ Map<String, String> preparedParams = prepareParameters(parameters);
+ // read stream
+ String configuration = readStream(inputStream);
+ for (Entry<String, String> entry : preparedParams.entrySet())
+ {
+ configuration = configuration.replace(entry.getKey(), entry.getValue());
+ }
+ // create new stream
+ InputStream configurationStream = new
ByteArrayInputStream(configuration.getBytes());
+ return configurationStream;
+ }
+
+ /**
+ * Reads configuration file from a stream and replaces all the occurrences of
template-variables
+ * (like : "${parameter.name}") with values provided in the map.
+ *
+ * @param filename
+ * @param parameters
+ * @return
+ * @throws IOException
+ */
+ public InputStream fillTemplate(String filename, Map<String, String> parameters)
throws IOException
+ {
+ InputStream inputStream = getInputStream(cfm, filename);
+ // inputStream still remains null, so file was not opened
+ if (inputStream == null)
+ {
+ throw new IOException("Can't find or open file:" + filename);
+ }
+ return fillTemplate(inputStream, parameters);
+ }
+
+ /**
+ * Tries first to get the file content using the configuration manager, if it cannot
+ * be found it will then try to get it from the context class loader of the current
thread,
+ * if it cannot be found it will try to get it from the class loader of the current
class and
+ * finally it still cannot be found it will try to use the file name as a file path.
+ * @param cfm the configuration manager from which we want to try to find the file
content
+ * @param filename the name of the file to found
+ * @return the {@link InputStream} corresponding to the file content if it can be
found
+ * <code>null</code> otherwise
+ */
+ public static InputStream getInputStream(ConfigurationManager cfm, String filename)
+ {
+ InputStream inputStream = null;
+ // try to get using configuration manager
+ try
+ {
+ inputStream = cfm.getInputStream(filename);
+ }
+ catch (Exception e)
+ {
+ // will try to use another resolve mechanism
+ }
+
+ // try to get resource by class loader
+ if (inputStream == null)
+ {
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ inputStream = cl == null ? null : cl.getResourceAsStream(filename);
+ }
+
+ // check system class loader
+ if (inputStream == null)
+ {
+ inputStream =
TemplateConfigurationHelper.class.getClassLoader().getResourceAsStream(filename);
+ }
+
+ // try to get as file stream
+ if (inputStream == null)
+ {
+ try
+ {
+ inputStream = PrivilegedFileHelper.fileInputStream(filename);
+ }
+ catch (IOException e)
+ {
+ // Still can't resolve
+ }
+ }
+ return inputStream;
+ }
+
+ /**
+ * Checks if String mathes to any pattern from the list
+ *
+ * @param patterns
+ * @param parameter
+ * @return
+ */
+ private boolean matches(List<Pattern> patterns, String parameter)
+ {
+ for (Pattern pattern : patterns)
+ {
+ if (pattern.matcher(parameter).matches())
+ {
+ // string matched
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Filters the map of parameters, leaving only those than matches filtering regular
expressions.
+ * Also adds "${}" to the parameter key: <br>
+ * I.e. such map provided on input:
+ *
+ * "jbosscache-cache.loader":"org.exoplatform"
+ * "jbosscache-configuration":"/conf/test.xml"
+ * "max-volatile-size":"100Kb"
+ *
+ * the output will be like:
+ *
+ * "${jbosscache-cache.loader}":"org.exoplatform"
+ *
+ * Other will be ignored (depending on includes/excludes lists provided in
constructor).
+ *
+ * @param parameters
+ * @return
+ */
+ protected Map<String, String> prepareParameters(Map<String, String>
parameters)
+ {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Entry<String, String> entry : parameters.entrySet())
+ {
+ if (matches(includes, entry.getKey()) && !matches(excludes,
entry.getKey()))
+ {
+ map.put("${" + entry.getKey() + "}", entry.getValue());
+ }
+ }
+ return map;
+ }
+
+ /**
+ * Reads bytes from input stream and builds a string from them
+ *
+ * @param inputStream
+ * @return
+ * @throws IOException
+ */
+ protected String readStream(InputStream inputStream) throws IOException
+ {
+ StringBuffer out = new StringBuffer();
+ byte[] b = new byte[4096];
+ for (int n; (n = inputStream.read(b)) != -1;)
+ {
+ out.append(new String(b, 0, n));
+ }
+ return out.toString();
+ }
+}
Copied:
kernel/trunk/exo.kernel.container/src/test/java/org/exoplatform/container/util/TestTemplateConfigurationHelper.java
(from rev 5271,
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/config/TestTemplateConfigurationHelper.java)
===================================================================
---
kernel/trunk/exo.kernel.container/src/test/java/org/exoplatform/container/util/TestTemplateConfigurationHelper.java
(rev 0)
+++
kernel/trunk/exo.kernel.container/src/test/java/org/exoplatform/container/util/TestTemplateConfigurationHelper.java 2011-12-15
16:30:03 UTC (rev 5331)
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.container.util;
+
+import junit.framework.TestCase;
+
+import org.exoplatform.container.configuration.ConfigurationManagerImpl;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: TestTemplateHelper.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+public class TestTemplateConfigurationHelper extends TestCase
+{
+
+ public void testFilters()
+ {
+ // create helper with predefined include and exclude patterns
+ TemplateConfigurationHelper helper =
+ new TemplateConfigurationHelper(new String[]{"^jbosscache-.*",
"^jgroups-configuration"},
+ new String[]{"^jbosscache-configuration"}, new
ConfigurationManagerImpl());
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put("jbosscache-configuration", "");
+ parameters.put("jbosscache-cache.loader", "");
+ parameters.put("jbosscache-clustername", "");
+ parameters.put("max-volatile-size", "");
+ Map<String, String> preparedParameters =
helper.prepareParameters(parameters);
+ assertEquals(2, preparedParameters.size());
+ // "jbosscache-configuration" and "max-volatile-size" should be
excluded
+
assertFalse(preparedParameters.containsKey("${jbosscache-configuration}"));
+ assertFalse(preparedParameters.containsKey("${max-volatile-size}"));
+
assertTrue(preparedParameters.containsKey("${jbosscache-cache.loader}"));
+ assertTrue(preparedParameters.containsKey("${jbosscache-clustername}"));
+ }
+
+ public void testFilters2()
+ {
+ // create helper with predefined include and exclude patterns
+ TemplateConfigurationHelper helper =
+ new TemplateConfigurationHelper(new String[]{"^jbosscache-.*",
"^jgroups-configuration"},
+ new String[]{"^jbosscache-configuration"}, new
ConfigurationManagerImpl());
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put("jgroups-configuration", "");
+ parameters.put("jbosscache-cache.loader", "");
+ parameters.put("jbosscache-clustername", "");
+ parameters.put("max-volatile-size", "");
+ Map<String, String> preparedParameters =
helper.prepareParameters(parameters);
+ assertEquals(3, preparedParameters.size());
+ // "jbosscache-configuration" and "max-volatile-size" should be
excluded
+ assertFalse(preparedParameters.containsKey("${max-volatile-size}"));
+
assertTrue(preparedParameters.containsKey("${jbosscache-cache.loader}"));
+ assertTrue(preparedParameters.containsKey("${jbosscache-clustername}"));
+ }
+
+ public void testTemplating() throws IOException
+ {
+ TemplateConfigurationHelper helper =
+ new TemplateConfigurationHelper(new String[]{"^jbosscache-.*",
"^jgroups-configuration"},
+ new String[]{"^jbosscache-configuration"}, new
ConfigurationManagerImpl());
+ String template = "configuration in any format, containing
${jbosscache-template-variable} and many others";
+ String expectedConfig = "configuration in any format, containing pretty good
parameter and many others";
+
+ InputStream templateStream = new ByteArrayInputStream(template.getBytes());
+
+ Map<String, String> parameters = new HashMap<String, String>();
+ parameters.put("jbosscache-template-variable", "pretty good
parameter");
+
+ InputStream configStream = helper.fillTemplate(templateStream, parameters);
+ String config = readStream(configStream);
+ assertTrue(expectedConfig.equals(config));
+ }
+
+ private String readStream(InputStream inputStream) throws IOException
+ {
+ StringBuffer out = new StringBuffer();
+ byte[] b = new byte[4096];
+ for (int n; (n = inputStream.read(b)) != -1;)
+ {
+ out.append(new String(b, 0, n));
+ }
+ return out.toString();
+ }
+}
Modified: kernel/trunk/pom.xml
===================================================================
--- kernel/trunk/pom.xml 2011-12-15 15:26:48 UTC (rev 5330)
+++ kernel/trunk/pom.xml 2011-12-15 16:30:03 UTC (rev 5331)
@@ -213,7 +213,7 @@
<dependency>
<groupId>org.infinispan</groupId>
<artifactId>infinispan-core</artifactId>
- <version>5.0.1.FINAL</version>
+ <version>5.1.0.CR1</version>
</dependency>
<dependency>
<groupId>org.jibx</groupId>