Author: nzamosenchuk
Date: 2010-11-02 03:25:50 -0400 (Tue, 02 Nov 2010)
New Revision: 3373
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkIndexerCacheLoader.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkService.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkServiceImpl.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/JBossCacheChunkIndexChangesFilter.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/PersistentIndexChunk.java
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/pom.xml
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChunkIndex.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jbosscache/AbstractWriteOnlyCacheLoader.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/util/io/PrivilegedCacheHelper.java
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jbosscache-indexer.xml
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jcr-config.xml
Log:
EXOJCR-987 : POC commit. Nodes can join and leave the cluster. Some places contain
hardcode and woraround, that are acceptable for now
Modified: jcr/branches/1.14-CNK/exo.jcr.component.core/pom.xml
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/pom.xml 2010-11-01 10:50:31 UTC (rev
3372)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/pom.xml 2010-11-02 07:25:50 UTC (rev
3373)
@@ -30,7 +30,7 @@
<name>eXo JCR :: Component :: Core Service</name>
<description>eXo JCR Service core component</description>
<properties>
-
<jcr.test.configuration.file>/conf/standalone/test-configuration.xml</jcr.test.configuration.file>
+
<jcr.test.configuration.file>/conf/standalone/cluster/test-configuration.xml</jcr.test.configuration.file>
<jbosscache.shareable>true</jbosscache.shareable>
</properties>
<dependencies>
@@ -434,6 +434,9 @@
<exclude>org/exoplatform/services/jcr/**/impl/**/SQLBenchmarkTest.java</exclude>
<exclude>org/exoplatform/services/jcr/**/impl/**/TestLockPerstistentDataManager.java</exclude>
<exclude>org/exoplatform/services/jcr/**/impl/**/TestCleanableFileStreamValueData.java</exclude>
+ <!-- After restore - default changes filter is used, so no
ChunkService injected into serachManager, but
+ ChunkIndex is hardcoded and requires ChunkService -->
+
<exclude>org/exoplatform/services/jcr/**/TestSVNodeDataOptimization.java</exclude>
</excludes>
</configuration>
</plugin>
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -18,6 +18,7 @@
import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
import org.exoplatform.services.jcr.datamodel.NodeData;
+import org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService;
import org.exoplatform.services.jcr.impl.core.query.lucene.ChunkIndex;
import org.exoplatform.services.jcr.impl.core.query.lucene.DefaultIndexUpdateMonitor;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
@@ -74,6 +75,12 @@
*/
protected IndexInfos indexInfos;
+ /**
+ * TODO: re-view this part of dependency delivery
+ * Temporary solution to deliver ChunkService to ChunkIndex (former MultiIndex)
+ */
+ protected ChunkService chunkService;
+
private IndexUpdateMonitor indexUpdateMonitor;
public boolean isInitialized()
@@ -82,6 +89,22 @@
}
/**
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#getChunkService()
+ */
+ public ChunkService getChunkService()
+ {
+ return chunkService;
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.QueryHandler#setChunkService(org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService)
+ */
+ public void setChunkService(ChunkService chunkService)
+ {
+ this.chunkService = chunkService;
+ }
+
+ /**
* @see
org.exoplatform.services.jcr.impl.core.query.QueryHandler#setIndexerIoModeHandler(org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler)
*/
public void setIndexerIoModeHandler(IndexerIoModeHandler modeHandler) throws
IOException
@@ -209,6 +232,7 @@
*
* @param idleTime the query handler idle time.
*/
+ @Deprecated
public void setIdleTime(String idleTime)
{
log.warn("Parameter 'idleTime' is not supported anymore. "
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -21,6 +21,7 @@
import org.exoplatform.services.jcr.datamodel.NodeData;
import org.exoplatform.services.jcr.impl.core.SessionDataManager;
import org.exoplatform.services.jcr.impl.core.SessionImpl;
+import org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService;
import org.exoplatform.services.jcr.impl.core.query.lucene.ChunkIndex;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor;
@@ -161,7 +162,7 @@
* @param indexInfos
*/
void setIndexInfos(IndexInfos indexInfos);
-
+
/**
* Returns {@link IndexInfos} instance that was set into QueryHandler.
* @return
@@ -178,4 +179,16 @@
*/
void setIndexUpdateMonitor(IndexUpdateMonitor indexUpdateMonitor);
+ /**
+ * Temporary solution to deliver {@link ChunkService} to {@link ChunkIndex} (former
MultiIndex)
+ * @param chunkService
+ */
+ void setChunkService(ChunkService chunkService);
+
+ /**
+ * Temporary solution to deliver {@link ChunkService} to {@link ChunkIndex} (former
MultiIndex)
+ * @return
+ */
+ ChunkService getChunkService();
+
}
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkIndexerCacheLoader.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkIndexerCacheLoader.java
(rev 0)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkIndexerCacheLoader.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -0,0 +1,288 @@
+/*
+ * Copyright (C) 2003-2009 eXo Platform SAS.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License
+ * as published by the Free Software Foundation; either version 3
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not,
see<http://www.gnu.org/licenses/>.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk;
+
+import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
+import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import
org.exoplatform.services.jcr.impl.core.query.jbosscache.ChangesFilterListsWrapper;
+import
org.exoplatform.services.jcr.impl.storage.jbosscache.AbstractWriteOnlyCacheLoader;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Modification;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
+/**
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: IndexerCacheLoader.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+public class ChunkIndexerCacheLoader extends AbstractWriteOnlyCacheLoader
+{
+ private static final Log log =
ExoLogger.getLogger("exo.jcr.component.core.IndexerCacheLoader");
+
+ /**
+ * A map of all the indexers that has been registered
+ */
+ private final Map<Fqn<String>, Indexer> indexers = new
HashMap<Fqn<String>, Indexer>();
+
+ /**
+ * @see org.jboss.cache.loader.AbstractCacheLoader#commit(java.lang.Object)
+ */
+ @Override
+ public void commit(Object tx) throws Exception
+ {
+ // do nothing. Everything is done on prepare phase.
+ }
+
+ /**
+ * This method will register a new Indexer according to the given parameters.
+ *
+ * @param searchManager
+ * @param parentSearchManager
+ * @param handler
+ * @param parentHandler
+ * @throws RepositoryConfigurationException
+ */
+ public void register(SearchManager searchManager, SearchManager parentSearchManager,
QueryHandler handler,
+ QueryHandler parentHandler, ChunkService chunkService) throws
RepositoryConfigurationException
+ {
+ indexers.put(Fqn.fromElements(searchManager.getWsId()), new Indexer(searchManager,
parentSearchManager, handler,
+ parentHandler, chunkService));
+ if (log.isDebugEnabled())
+ {
+ log.debug("Register " + searchManager.getWsId() + " " + this
+ " in " + indexers);
+ }
+ }
+
+ /**
+ * @see org.jboss.cache.loader.CacheLoader#put(org.jboss.cache.Fqn, java.lang.Object,
java.lang.Object)
+ */
+ public Object put(Fqn name, Object key, Object value) throws Exception
+ {
+ return null;
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.storage.jbosscache.AbstractWriteOnlyCacheLoader#put(org.jboss.cache.Fqn,
java.util.Map)
+ */
+ public void put(Fqn fqn, Map<Object, Object> data) throws Exception
+ {
+ // ignoring call cacheRoot.addChild(PARAMETER_ROOT).setResident(true);
+ if (log.isDebugEnabled())
+ {
+ log.info("Received list wrapper, start indexing...");
+ }
+
+ // TODO Review it!!
+ if (fqn.getLastElementAsString().equals(ChunkServiceImpl.CHUNK_LIST))
+ {
+ // skipping all the rest of the code if service data written
+ log.info("skipping fqn " + fqn);
+ return;
+ }
+
+ Set<Integer> processingChunks = null;
+
+ ChunkService chunkService = null;
+
+ Indexer indexer = indexers.get(fqn.getParent());
+ if (indexer == null)
+ {
+ log.warn("No indexer could be found for the fqn " + fqn.getParent());
+ if (log.isDebugEnabled())
+ {
+ log.debug("The current content of the map of indexers is " +
indexers);
+ }
+ }
+ else
+ {
+ try
+ {
+ // updating index
+ ChangesFilterListsWrapper wrapper =
+ new ChangesFilterListsWrapper(new HashSet<String>(), new
HashSet<String>(), new HashSet<String>(),
+ new HashSet<String>());
+
+ // combine wrappers. Aggregating them into one wrapper, to pass through
SearchManager and SearchIndex
+ chunkService = indexer.getChunkService();
+ processingChunks = chunkService.getAssignedChunks(true);
+ chunkService.acquireJob();
+
+ for (Integer chunkId : processingChunks)
+ {
+ ChangesFilterListsWrapper wrapperPart =
(ChangesFilterListsWrapper)data.get(chunkId);
+ if (wrapperPart != null)
+ {
+ wrapper.getAddedNodes().addAll((wrapperPart).getAddedNodes());
+
wrapper.getParentAddedNodes().addAll((wrapperPart).getParentAddedNodes());
+ wrapper.getRemovedNodes().addAll((wrapperPart).getRemovedNodes());
+
wrapper.getParentRemovedNodes().addAll((wrapperPart).getParentRemovedNodes());
+ }
+ }
+ indexer.updateIndex(wrapper.getAddedNodes(), wrapper.getRemovedNodes(),
wrapper.getParentAddedNodes(),
+ wrapper.getParentRemovedNodes());
+ }
+ finally
+ {
+ if (chunkService != null)
+ {
+ chunkService.releaseJob();
+ }
+ // removing processed chunk jobs
+ if (processingChunks != null)
+ {
+ for (Integer chunkId : processingChunks)
+ {
+ cache.remove(fqn, chunkId);
+ }
+ }
+ // if no attributes present in JBC node remove it.
+ if (cache.getNode(fqn).getKeys().isEmpty())
+ {
+ cache.removeNode(fqn);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.storage.jbosscache.AbstractWriteOnlyCacheLoader#put(java.util.List)
+ */
+ @Override
+ public void put(List<Modification> modifications) throws Exception
+ {
+ // do nothing. Index is updated on prepare phase.
+ }
+
+ /**
+ * @see org.jboss.cache.loader.CacheLoader#remove(org.jboss.cache.Fqn)
+ */
+ public void remove(Fqn arg0) throws Exception
+ {
+ // do nothing
+ }
+
+ @Override
+ public void removeData(Fqn arg0) throws Exception
+ {
+ log.info("REMOVE DATA: FQN = " + arg0);
+ }
+
+ /**
+ * This class will update the indexes of the related workspace
+ */
+ private static class Indexer
+ {
+
+ private final SearchManager searchManager;
+
+ private final SearchManager parentSearchManager;
+
+ private final QueryHandler handler;
+
+ private final QueryHandler parentHandler;
+
+ private final ChunkService chunkService;
+
+ public Indexer(SearchManager searchManager, SearchManager parentSearchManager,
QueryHandler handler,
+ QueryHandler parentHandler, ChunkService chunkService) throws
RepositoryConfigurationException
+ {
+ this.searchManager = searchManager;
+ this.parentSearchManager = parentSearchManager;
+ this.handler = handler;
+ this.parentHandler = parentHandler;
+ this.chunkService = chunkService;
+ }
+
+ public ChunkService getChunkService()
+ {
+ return chunkService;
+ }
+
+ /**
+ * Flushes lists of added/removed nodes to SearchManagers, starting indexing.
+ *
+ * @param addedNodes
+ * @param removedNodes
+ * @param parentAddedNodes
+ * @param parentRemovedNodes
+ */
+ protected void updateIndex(Set<String> addedNodes, Set<String>
removedNodes, Set<String> parentAddedNodes,
+ Set<String> parentRemovedNodes)
+ {
+ // pass lists to search manager
+ if (searchManager != null && (addedNodes.size() > 0 ||
removedNodes.size() > 0))
+ {
+ try
+ {
+ searchManager.updateIndex(removedNodes, addedNodes);
+ }
+ catch (RepositoryException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ }
+ catch (IOException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ try
+ {
+ handler.logErrorChanges(removedNodes, addedNodes);
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not
complete. " + ioe, ioe);
+ }
+ }
+ }
+ // pass lists to parent search manager
+ if (parentSearchManager != null && (parentAddedNodes.size() > 0 ||
parentRemovedNodes.size() > 0))
+ {
+ try
+ {
+ parentSearchManager.updateIndex(parentRemovedNodes, parentAddedNodes);
+ }
+ catch (RepositoryException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ }
+ catch (IOException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ try
+ {
+ parentHandler.logErrorChanges(removedNodes, addedNodes);
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not
complete. " + ioe, ioe);
+ }
+ }
+ }
+ }
+ }
+}
Property changes on:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkIndexerCacheLoader.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkService.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkService.java
(rev 0)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkService.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk;
+
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+
+import java.util.Set;
+
+/**
+ * This service defines all the methods needed to be able to:
+ * <ul>
+ * <li>split the Lucene indexes into several chunks</li>
+ * <li>assign the chunks to a cluster node</li>
+ * <li>assign JCR Node to a chunk</li>
+ * </ul>
+ *
+ * @author Nicolas Filotto
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: ChunkService.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+public interface ChunkService
+{
+
+ /**
+ * Returns the configured number of index chunks
+ * @return number of chunks
+ */
+ int getChunkCount();
+
+ /**
+ * Gives the id of the chunk corresponding to the given node id
+ * @param nodeId the node Id for which we want to find the corresponding chunk id
+ * @return the corresponding chunk index
+ */
+ int getChunkId(String nodeId);
+
+ /**
+ * TODO review this method
+ * Gives the full list of all the ids of chunks managed by the local machine
+ * @param blocking block current thread is reassigning is in progress
+ * @return the list of chunk ids managed by the local machine
+ */
+ Set<Integer> getAssignedChunks(boolean blocking);
+
+ /**
+ * Returns the instance of {@link IndexerIoModeHandler} managed by {@link
ChunkService}. So
+ * {@link ChunkService} instance has an ability to control indexer I/O mode.
+ * @return instance of IndexerIoModeHandler
+ */
+ IndexerIoModeHandler getModeHandler();
+
+ /**
+ * Must be threadsafe! Notifies {@link ChunkService} that one more thread is working
+ * for indexing now. This is a job tracking feature. It is responsible for releasing
+ * and re-acquiring IndexChunks when cluster view changes. {@link ChunkService} must
be
+ * aware of indexes that can be released and re-assigned to another JCR instance.
After
+ * cluster view changed and no more jobs are currently than Chunks are marked as
free.
+ */
+ void acquireJob();
+
+ /**
+ * Must be threadsafe! Notifies {@link ChunkService} that one thread is no more
+ * working for indexing now. This is a job tracking feature. It is responsible for
releasing
+ * and re-acquiring IndexChunks when cluster view changes. {@link ChunkService} must
be
+ * aware of indexes that can be released and re-assigned to another JCR instance.
After
+ * cluster view changed and no more jobs are currently than Chunks are marked as
free.
+ */
+ void releaseJob();
+
+}
\ No newline at end of file
Property changes on:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkService.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkServiceImpl.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkServiceImpl.java
(rev 0)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkServiceImpl.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -0,0 +1,406 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk;
+
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeModified;
+import org.jboss.cache.notifications.annotation.ViewChanged;
+import org.jboss.cache.notifications.event.NodeModifiedEvent;
+import org.jboss.cache.notifications.event.NodeRemovedEvent;
+import org.jboss.cache.notifications.event.ViewChangedEvent;
+import org.jboss.cache.notifications.event.NodeModifiedEvent.ModificationType;
+import org.jgroups.Address;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ASSIGNING:
+ * 1. {@link ChunkServiceImpl#onClusterViewChanged(ViewChangedEvent)} : notification to
start ASSIGNING process, block {@link ChunkServiceImpl#getAssignedChunks()}
+ * 2. wait until all local threads invokes {@link ChunkServiceImpl#releaseJob()}
+ * 3. cleanup corresponding attributes in cache and the whole JBC node if attributes are
empty (Chunk<-->ClusterNode attributes).
+ * Set IOMode read only
+ * 4. if JBC node was removed by local machine - ({@link
ChunkServiceImpl#onNodeRemoved(NodeRemovedEvent)}) assign chunks, unblock, set IOMode
write
+ * 5. Else, if {@link ChunkServiceImpl#onNodeModified(NodeModifiedEvent)} then read map
and unblock, set IOMode write
+ *
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: ChunkServiceImpl.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+@CacheListener
+public class ChunkServiceImpl implements ChunkService
+{
+ public static final String CHUNK_LIST = "$chunks";
+
+ private static final Log log =
ExoLogger.getLogger("exo.jcr.component.core.ChunkService");
+
+ private final IndexerIoModeHandler modeHandler;
+
+ private int chunkNum;
+
+ private final AtomicInteger jobCount = new AtomicInteger(0);
+
+ private final Cache<Serializable, Object> cache;
+
+ private final Fqn rootFqn;
+
+ private ServiceState state;
+
+ private Object reassigninMonitor = new Object();
+
+ private Set<Integer> currentlyAssigned;
+
+ private List<Address> clusterView;
+
+ public ChunkServiceImpl(int chunkNum, Cache<Serializable, Object> cache, Fqn
rootFqn)
+ {
+ super();
+ this.chunkNum = chunkNum;
+ this.cache = cache;
+ this.cache.addCacheListener(this);
+ this.modeHandler = new IndexerIoModeHandler(IndexerIoMode.READ_ONLY);
+ this.rootFqn = rootFqn;
+ this.currentlyAssigned = new HashSet<Integer>();
+ this.state = ServiceState.ASSIGNING;
+ clusterView = new ArrayList<Address>(cache.getMembers());
+ Collections.sort(clusterView);
+ // first initial start
+ Map<Serializable, Object> data = cache.getData(Fqn.fromElements(rootFqn,
CHUNK_LIST));
+ if (data == null || data.isEmpty())
+ {
+ log.info(rootFqn + "\t creating ChunkSErviceImpl, doing init...");
+ doAssign();
+ }
+ else
+ {
+ log.info(rootFqn + "\t creating ChunkSErviceImpl, skipping init...");
+ if (data.size() == chunkNum)
+ {
+ log.info(rootFqn + "\t creating ChunkSErviceImpl, reading:" +
data);
+ Address local = cache.getLocalAddress();
+ // if local address already present in data map - then it has been already
assugned
+ for (Object address : data.values())
+ {
+ if (local.equals(address))
+ {
+ readAssigned(data);
+ doFinishAssign();
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ private void doStartAssign(List<Address> members)
+ {
+ // mark index as ASSIGNING
+ // Waiting till all the local jobs finished
+ if (this.state != ServiceState.ASSIGNING)
+ {
+ this.state = ServiceState.ASSIGNING;
+ // cleaning currently assigned chunks
+ log.info(rootFqn.toString() + "\tdoStartAssign. Members = " +
members.toString());
+
+ if (((CacheSPI)cache).getRPCManager().isCoordinator())
+ {
+ log.info(rootFqn.toString() + "\tonCLusterViewChanged cleaning suspected
member");
+ // cleaning up suspected member
+ if (((CacheSPI)cache).getRPCManager().isCoordinator())
+ {
+ for (Address address : clusterView)
+ {
+ Map<Serializable, Object> data =
cache.getData(Fqn.fromElements(rootFqn, CHUNK_LIST));
+ if (!members.contains(address))
+ {
+ // this member is suspected
+ // trying to remove information about chunks used by it
+ for (Integer i = 0; i < chunkNum; i++)
+ {
+ if (data.get(i) != null && data.get(i).equals(address))
+ {
+ cache.remove(Fqn.fromElements(rootFqn, CHUNK_LIST), i);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ if (jobCount.get() == 0)
+ {
+ doReleaseChunks();
+ }
+ }
+ }
+
+ private void doAssign()
+ {
+ List<Address> members = new ArrayList<Address>(cache.getMembers());
+ Collections.sort(members);
+ log.info(rootFqn.toString() + "\tdoAssign. Members = " +
members.toString());
+
+ Address localAddress = cache.getLocalAddress();
+
+ if (members.size() == 0)
+ {
+ throw new UnsupportedOperationException("List of memebers is empty. Not yet
implemented usecase.");
+ }
+ {
+ log.info(rootFqn.toString() + "\tdoAssign. LocalMachine is coordinator,
performing...");
+ // initializing the list of chunks in cache
+ Map<Integer, Address> chunkDistribution = new HashMap<Integer,
Address>();
+ int clusterSize = members.size();
+ for (Integer i = 0; i < chunkNum; i++)
+ {
+ chunkDistribution.put(i, members.get(i % clusterSize));
+ if (members.get(i % clusterSize).equals(localAddress))
+ {
+ currentlyAssigned.add(i);
+ }
+ }
+ log.info(rootFqn.toString() + "\tdoAssign. Putting ChunkDistribution =
" + chunkDistribution.toString());
+ this.cache.put(Fqn.fromElements(rootFqn, CHUNK_LIST), chunkDistribution);
+ log.info(rootFqn.toString() + "\tdoAssign. Put done ");
+ //this.cache.getNode(Fqn.fromElements(rootFqn, CHUNK_LIST)).setResident(true);
+ doFinishAssign();
+ }
+ }
+
+ private void readAssigned(Map<Serializable, Object> chunkDistribution)
+ {
+ {
+ // read in local variable current chunks
+ Address localAddress = cache.getLocalAddress();
+ // TODO
+ if (currentlyAssigned.isEmpty())
+ {
+ //Map<Serializable, Object> chunkDistribution =
cache.getData(Fqn.fromElements(rootFqn, CHUNK_LIST));
+ for (Integer i = 0; i < chunkNum; i++)
+ {
+ if (chunkDistribution.get(i) != null &&
chunkDistribution.get(i).equals(localAddress))
+ {
+ currentlyAssigned.add(i);
+ }
+ }
+ }
+ }
+ }
+
+ private void doFinishAssign()
+ {
+ log.info(rootFqn.toString() + "\tdoFinishAssign: currentlyAssigned: " +
currentlyAssigned);
+ this.state = ServiceState.NORMAL;
+ modeHandler.setMode(IndexerIoMode.READ_WRITE);
+ synchronized (reassigninMonitor)
+ {
+ log.info(rootFqn.toString() + "\tdoFinishAssign. ASSIGNING finished:
Notifying");
+ reassigninMonitor.notifyAll();
+ }
+ }
+
+ /**
+ * Closes indexWriters on owning indexes and cleans up corresponding items from map:
Map<Chunk, CluserNode>
+ * stored in cache.
+ */
+ private void doReleaseChunks()
+ {
+ // Notifies ChunkIndex to release indexes
+ modeHandler.setMode(IndexerIoMode.READ_ONLY);
+ // clean up Map<Chunk, CluserNode>
+ Iterator<Integer> it = currentlyAssigned.iterator();
+ log.info(rootFqn.toString() + "\tdoReleaseChunks. Removing from info
cache...");
+ try
+ {
+ Thread.sleep(4000);
+ }
+ catch (InterruptedException e1)
+ {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
+ while (it.hasNext())
+ {
+ Integer chunkId = it.next();
+ Object remove = this.cache.remove(Fqn.fromElements(rootFqn, CHUNK_LIST),
chunkId);
+ if (remove == null)
+ {
+ System.out.println("NULL RETURNED BY REMOVE " + rootFqn + "
" + chunkId);
+ }
+ }
+ log.info(rootFqn.toString() + "\tdoReleaseChunks. Removed info ");
+ currentlyAssigned.clear();
+ // if this is the last one cluster machine releasing it's chunks - clean the
node containing info about chunks.
+ Node listNode = this.cache.getNode(Fqn.fromElements(rootFqn, CHUNK_LIST));
+ if (listNode != null && listNode.getData().isEmpty())
+ {
+ try
+ {
+ log.info(rootFqn.toString() + "\tdoReleaseChunks. Removing node
");
+
+ this.cache.removeNode(Fqn.fromElements(rootFqn, CHUNK_LIST));
+ log.info(rootFqn.toString() + "\tdoReleaseChunks. Removed node ");
+ doAssign();
+ }
+ catch (Exception e)
+ {
+ System.out.println("EXCEPTION FOUND:");
+ e.printStackTrace();
+ }
+ }
+ else
+ {
+ if (listNode != null)
+ {
+ log.info("not removed!!! WS=" + rootFqn + " " +
listNode.getData() + " localaddr="
+ + cache.getLocalAddress());
+ }
+ }
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#getAssignedChunks()
+ */
+ public Set<Integer> getAssignedChunks(boolean blocking)
+ {
+ // Block until chunk transfer not finished
+ if (state != ServiceState.NORMAL && blocking)
+ {
+ synchronized (reassigninMonitor)
+ {
+ log.info(rootFqn.toString() + "\tgetAssignedChunks. Waitning ASSIGNING
finished");
+ while (state != ServiceState.NORMAL)
+ {
+ try
+ {
+ reassigninMonitor.wait();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("InterruptedException");
+ }
+ }
+ }
+ log.info(rootFqn.toString() + "\tgetAssignedChunks. ASSIGNING finished:
Working");
+ }
+ return new HashSet<Integer>(currentlyAssigned);
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#getChunkCount()
+ */
+ public int getChunkCount()
+ {
+ return chunkNum;
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#getChunkId(java.lang.String)
+ */
+ public int getChunkId(String nodeId)
+ {
+ return Math.abs(nodeId.hashCode()) % chunkNum;
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#acquireJob()
+ */
+ public void acquireJob()
+ {
+ jobCount.incrementAndGet();
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#releaseJob()
+ */
+ public void releaseJob()
+ {
+ int currentJobs = jobCount.decrementAndGet();
+ if (this.state == ServiceState.ASSIGNING && currentJobs == 0)
+ {
+ // all local threads finished their jobs, so indexes can be released
+ doReleaseChunks();
+ }
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService#getModeHandler()
+ */
+ public IndexerIoModeHandler getModeHandler()
+ {
+ return modeHandler;
+ }
+
+ /**
+ * Listener, receiving the events, when cluster changes
+ * @param e
+ */
+ @ViewChanged
+ public void onClusterViewChanged(ViewChangedEvent e)
+ {
+ List<Address> members = new
ArrayList<Address>(e.getNewView().getMembers());
+ Collections.sort(members);
+ if (!e.isPre())
+ {
+ log.info(rootFqn.toString() + "\tonCLusterViewChanged doing start
assign");
+ // initiate the process of ASSIGNING
+ doStartAssign(members);
+ }
+ }
+
+ /**
+ * Listener, receiving the events, when node is modified. It handles only events when
+ *
+ * @param e
+ */
+ @NodeModified
+ public void onNodeModified(NodeModifiedEvent e)
+ {
+ // if this modification is PUT_MAP belongs to /WS1/CHUNK_LISTS and current state is
ASSIGNING
+ if (!e.isPre() && this.state == ServiceState.ASSIGNING
+ && e.getFqn().equals(Fqn.fromElements(rootFqn, CHUNK_LIST))
+ && e.getModificationType() == ModificationType.PUT_MAP)
+ {
+ // re-assigning is about to finish. Map<Chunk, ClusterNode> already
filled
+ readAssigned(e.getData());
+ doFinishAssign();
+ }
+ }
+
+ private enum ServiceState {
+ NORMAL, ASSIGNING;
+ }
+
+}
Property changes on:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/ChunkServiceImpl.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/JBossCacheChunkIndexChangesFilter.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/JBossCacheChunkIndexChangesFilter.java
(rev 0)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/JBossCacheChunkIndexChangesFilter.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -0,0 +1,252 @@
+/*
+ * Copyright (C) 2009 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk;
+
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.services.jcr.config.QueryHandlerEntry;
+import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
+import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
+import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import
org.exoplatform.services.jcr.impl.core.query.jbosscache.ChangesFilterListsWrapper;
+import org.exoplatform.services.jcr.impl.util.io.PrivilegedCacheHelper;
+import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory;
+import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory.CacheType;
+import org.exoplatform.services.jcr.util.IdGenerator;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
+/**
+ * @author <a href="mailto:Sergey.Kabashnyuk@exoplatform.org">Sergey
Kabashnyuk</a>
+ * @version $Id: exo-jboss-codetemplates.xml 34360 2009-07-22 23:58:59Z ksm $
+ *
+ */
+public class JBossCacheChunkIndexChangesFilter extends IndexerChangesFilter
+{
+ /**
+ * Logger instance for this class
+ */
+ private final Log log =
ExoLogger.getLogger("exo.jcr.component.core.JBossCacheIndexChangesFilter");
+
+ public static final String PARAM_JBOSSCACHE_CONFIGURATION =
"jbosscache-configuration";
+
+ public static final String PARAM_JBOSSCACHE_PUSHSTATE =
"jbosscache-sscl-push.state.enabled";
+
+ public static final String PARAM_JBOSSCACHE_PUSHSTATE_TIMEOUT =
"jbosscache-sscl-push.state.timeout";
+
+ /**
+ * Indicate whether the JBoss Cache instance used can be shared with other caches
+ */
+ public static final String PARAM_JBOSSCACHE_SHAREABLE =
"jbosscache-shareable";
+
+ public static final Boolean PARAM_JBOSSCACHE_SHAREABLE_DEFAULT = Boolean.FALSE;
+
+ private final Cache<Serializable, Object> cache;
+
+ private final Fqn<String> rootFqn;
+
+ public static final String LISTWRAPPER = "$lists".intern();
+
+ private final ChunkService chunkService;
+
+ /**
+ * @param searchManager
+ * @param config
+ * @param indexingTree
+ * @throws RepositoryConfigurationException
+ */
+ public JBossCacheChunkIndexChangesFilter(SearchManager searchManager, SearchManager
parentSearchManager,
+ QueryHandlerEntry config, IndexingTree indexingTree, IndexingTree
parentIndexingTree, QueryHandler handler,
+ QueryHandler parentHandler, ConfigurationManager cfm) throws IOException,
RepositoryException,
+ RepositoryConfigurationException
+ {
+ super(searchManager, parentSearchManager, config, indexingTree, parentIndexingTree,
handler, parentHandler, cfm);
+ // create cache using custom factory
+ ExoJBossCacheFactory<Serializable, Object> factory = new
ExoJBossCacheFactory<Serializable, Object>(cfm);
+ Cache<Serializable, Object> initCache = factory.createCache(config);
+
+ // initialize IndexerCacheLoader
+ ChunkIndexerCacheLoader indexerCacheLoader = new ChunkIndexerCacheLoader();
+
+ // create CacheLoaderConfig
+ IndividualCacheLoaderConfig individualCacheLoaderConfig = new
IndividualCacheLoaderConfig();
+ // set CacheLoader
+ individualCacheLoaderConfig.setCacheLoader(indexerCacheLoader);
+ // set parameters
+ individualCacheLoaderConfig.setFetchPersistentState(false);
+ individualCacheLoaderConfig.setAsync(false);
+ individualCacheLoaderConfig.setIgnoreModifications(false);
+ individualCacheLoaderConfig.setPurgeOnStartup(false);
+ // create CacheLoaderConfig
+ CacheLoaderConfig cacheLoaderConfig = new CacheLoaderConfig();
+ cacheLoaderConfig.setShared(false);
+ cacheLoaderConfig.setPassivation(false);
+ cacheLoaderConfig.addIndividualCacheLoaderConfig(individualCacheLoaderConfig);
+ // insert CacheLoaderConfig
+ initCache.getConfiguration().setCacheLoaderConfig(cacheLoaderConfig);
+ this.rootFqn = Fqn.fromElements(searchManager.getWsId());
+ this.cache =
+ ExoJBossCacheFactory.getUniqueInstance(CacheType.INDEX_CACHE, rootFqn,
initCache, config.getParameterBoolean(
+ PARAM_JBOSSCACHE_SHAREABLE, PARAM_JBOSSCACHE_SHAREABLE_DEFAULT));
+
+ PrivilegedCacheHelper.create(cache);
+ PrivilegedCacheHelper.start(cache);
+
+ this.chunkService = new ChunkServiceImpl(4, cache, rootFqn);
+
+ indexerCacheLoader =
(ChunkIndexerCacheLoader)((CacheSPI)cache).getCacheLoaderManager().getCacheLoader();
+ indexerCacheLoader.register(searchManager, parentSearchManager, handler,
parentHandler, this.chunkService);
+ // TODO Dummy IOModeHandler
+ IndexerIoModeHandler modeHandler = chunkService.getModeHandler(); //new
IndexerIoModeHandler(IndexerIoMode.READ_WRITE);
+ handler.setIndexerIoModeHandler(modeHandler);
+ parentHandler.setIndexerIoModeHandler(modeHandler);
+
+ // TODO: review this injection
+ handler.setChunkService(chunkService);
+ parentHandler.setChunkService(chunkService);
+
+ if (!parentHandler.isInitialized())
+ {
+ parentHandler.init();
+ }
+ if (!handler.isInitialized())
+ {
+ handler.init();
+ }
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter#doUpdateIndex(java.util.Set,
java.util.Set, java.util.Set, java.util.Set)
+ */
+ @Override
+ protected void doUpdateIndex(Set<String> removedNodes, Set<String>
addedNodes, Set<String> parentRemovedNodes,
+ Set<String> parentAddedNodes)
+ {
+ String id = IdGenerator.generate();
+ try
+ {
+ // this map contains ChangesListWrapper in it's values. Key is the index of
Chunk,
+ // starting from 0.
+ Map<Serializable, Object> map = new HashMap<Serializable,
Object>();
+
+ // removedNodes
+ for (String uuid : removedNodes)
+ {
+ Integer chunkId = chunkService.getChunkId(uuid);
+ ChangesFilterListsWrapper list =
(ChangesFilterListsWrapper)map.get(chunkId);
+ if (list == null)
+ {
+ list =
+ new ChangesFilterListsWrapper(new HashSet<String>(), new
HashSet<String>(), new HashSet<String>(),
+ new HashSet<String>());
+ map.put(chunkId, list);
+ }
+ list.getRemovedNodes().add(uuid);
+ }
+
+ // addedNodes
+ for (String uuid : addedNodes)
+ {
+ Integer chunkId = chunkService.getChunkId(uuid);
+ ChangesFilterListsWrapper list =
(ChangesFilterListsWrapper)map.get(chunkId);
+ if (list == null)
+ {
+ list =
+ new ChangesFilterListsWrapper(new HashSet<String>(), new
HashSet<String>(), new HashSet<String>(),
+ new HashSet<String>());
+ map.put(chunkId, list);
+ }
+ list.getAddedNodes().add(uuid);
+ }
+
+ // parentRemovedNodes
+ for (String uuid : parentRemovedNodes)
+ {
+ Integer chunkId = chunkService.getChunkId(uuid);
+ ChangesFilterListsWrapper list =
(ChangesFilterListsWrapper)map.get(chunkId);
+ if (list == null)
+ {
+ list =
+ new ChangesFilterListsWrapper(new HashSet<String>(), new
HashSet<String>(), new HashSet<String>(),
+ new HashSet<String>());
+ map.put(chunkId, list);
+ }
+ list.getParentRemovedNodes().add(uuid);
+ }
+
+ // parentAddedNodes
+ for (String uuid : parentAddedNodes)
+ {
+ Integer chunkId = chunkService.getChunkId(uuid);
+ ChangesFilterListsWrapper list =
(ChangesFilterListsWrapper)map.get(chunkId);
+ if (list == null)
+ {
+ list =
+ new ChangesFilterListsWrapper(new HashSet<String>(), new
HashSet<String>(), new HashSet<String>(),
+ new HashSet<String>());
+ map.put(chunkId, list);
+ }
+ list.getParentAddedNodes().add(uuid);
+ }
+
+ PrivilegedCacheHelper.put(cache, Fqn.fromRelativeElements(rootFqn, id +
"MAP"), map);
+ }
+ catch (CacheException e)
+ {
+ log.error(e.getLocalizedMessage(), e);
+ logErrorChanges(handler, removedNodes, addedNodes);
+ logErrorChanges(parentHandler, parentRemovedNodes, parentAddedNodes);
+ }
+ }
+
+ /**
+ * Log errors
+ * @param logHandler
+ * @param removedNodes
+ * @param addedNodes
+ */
+ private void logErrorChanges(QueryHandler logHandler, Set<String> removedNodes,
Set<String> addedNodes)
+ {
+ try
+ {
+ logHandler.logErrorChanges(addedNodes, removedNodes);
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not complete.
" + ioe, ioe);
+ }
+ }
+}
Property changes on:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/chunk/JBossCacheChunkIndexChangesFilter.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChunkIndex.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChunkIndex.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChunkIndex.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -22,8 +22,11 @@
import org.exoplatform.services.jcr.dataflow.ItemDataConsumer;
import org.exoplatform.services.jcr.datamodel.ItemData;
import org.exoplatform.services.jcr.datamodel.NodeData;
-import org.exoplatform.services.jcr.impl.core.query.ChunkService;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeListener;
import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
+import org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.ChunkService;
import org.exoplatform.services.jcr.impl.core.query.lucene.directory.DirectoryManager;
import org.exoplatform.services.jcr.impl.util.SecurityHelper;
import org.slf4j.Logger;
@@ -47,7 +50,7 @@
/**
* TODO REWRITE JAVADOC!!!
*/
-public class ChunkIndex
+public class ChunkIndex implements IndexerIoModeListener
{
/**
@@ -116,6 +119,8 @@
private final ChunkService chunkService;
+ private final IndexerIoModeHandler modeHandler;
+
/**
* Creates a new MultiIndex.
*
@@ -127,7 +132,8 @@
* @throws IOException
* if an error occurs
*/
- ChunkIndex(SearchIndex handler, IndexingTree indexingTree) throws IOException
+ ChunkIndex(SearchIndex handler, IndexingTree indexingTree, IndexerIoModeHandler
modeHandler,
+ ChunkService chunkService) throws IOException
{
this.directoryManager = handler.getDirectoryManager();
// this method is run in privileged mode internally
@@ -137,8 +143,26 @@
this.indexingTree = indexingTree;
this.nsMappings = handler.getNamespaceMappings();
// TODO remove this stub
- this.chunkService = new ChunkServiceImpl();
+ this.chunkService = chunkService;
+ if (chunkService == null)
+ {
+ System.out.println("\n\n\n");
+ System.out.println("null");
+ System.out.println("\n\n\n");
+ try
+ {
+ throw new Exception();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ this.modeHandler = modeHandler;
+ this.modeHandler.addIndexerIoModeListener(this);
+
// this method is run in privileged mode internally
IndexingQueueStore store = new IndexingQueueStore(indexDir);
@@ -147,7 +171,7 @@
// open persistent indexes
- Set<Integer> assignedChunks = chunkService.getAssignedChunks();
+ Set<Integer> assignedChunks = chunkService.getAssignedChunks(false);
for (int i = 0; i < chunkService.getChunkCount(); i++)
{
@@ -166,7 +190,7 @@
PersistentIndexChunk index =
new PersistentIndexChunk(i, handler.getTextAnalyzer(),
handler.getSimilarity(), cache, indexingQueue,
directoryManager, !assignedChunks.contains(Integer.valueOf(i)));
-
+
index.setMaxFieldLength(handler.getMaxFieldLength());
index.setUseCompoundFile(handler.getUseCompoundFile());
index.setTermInfosIndexDivisor(handler.getTermInfosIndexDivisor());
@@ -519,6 +543,7 @@
*/
public synchronized void update(final Collection<String> remove, final
Collection<Document> add) throws IOException
{
+ log.info("Indexing: added " + add.size() + " and removed " +
remove.size());
SecurityHelper.doPriviledgedIOExceptionAction(new
PrivilegedExceptionAction<Object>()
{
public Object run() throws Exception
@@ -563,10 +588,14 @@
}
}
}
- // TODO for owning indexes only
+ // commit owning indexes only
+ Set<Integer> chunkIds = chunkService.getAssignedChunks(false);
for (PersistentIndexChunk idx : indexes)
{
- idx.commit();
+ if (chunkIds.contains(Integer.valueOf(idx.getId())))
+ {
+ idx.commit();
+ }
}
}
finally
@@ -579,7 +608,7 @@
}
/**
- * Checks the indexing queue for finished text extrator jobs and updates the
+ * Checks the indexing queue for finished text extractor jobs and updates the
* index accordingly if there are any new ones.
*
* @param transactionPresent
@@ -745,4 +774,38 @@
}
});
}
+
+ private synchronized void doReleaseChunks()
+ {
+ for (PersistentIndexChunk idx : indexes)
+ {
+ idx.setReadOnly(true);
+ }
+ }
+
+ private synchronized void doAcquireChunks()
+ {
+ Set<Integer> chunkIds = chunkService.getAssignedChunks(false);
+ for (Integer id : chunkIds)
+ {
+ indexes.get(id).setReadOnly(false);
+ }
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.IndexerIoModeListener#onChangeMode(org.exoplatform.services.jcr.impl.core.query.IndexerIoMode)
+ */
+ public void onChangeMode(IndexerIoMode mode)
+ {
+ switch (mode)
+ {
+ case READ_ONLY :
+ doReleaseChunks();
+ break;
+ case READ_WRITE :
+ doAcquireChunks();
+ break;
+ }
+
+ }
}
\ No newline at end of file
Added:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/PersistentIndexChunk.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/PersistentIndexChunk.java
(rev 0)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/PersistentIndexChunk.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.exoplatform.services.jcr.impl.core.query.lucene;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Similarity;
+import org.exoplatform.services.jcr.impl.core.query.lucene.directory.DirectoryManager;
+
+import java.io.IOException;
+
+/**
+ * TODO Write javadoc. This index extends Abstract with checks, if index is
+ * owned by this machine
+ * Implements a lucene index which is based on a
+ * {@link org.apache.jackrabbit.core.fs.FileSystem}.
+ */
+class PersistentIndexChunk extends AbstractIndex
+{
+
+ // TODO review checks. Some more methods should check if index is read only.
+
+ /** The Id of this persistent index */
+ private final int id;
+
+ private boolean readOnly;
+
+ /**
+ * Creates a new <code>PersistentIndex</code>.
+ *
+ * @param name the name of this index.
+ * @param analyzer the analyzer for text tokenizing.
+ * @param similarity the similarity implementation.
+ * @param cache the document number cache
+ * @param indexingQueue the indexing queue.
+ * @param directoryManager the directory manager.
+ * @param readOnly if current index is not owned by local machine
+ * @throws IOException if an error occurs while opening / creating the
+ * index.
+ */
+ PersistentIndexChunk(int id, Analyzer analyzer, Similarity similarity, DocNumberCache
cache,
+ IndexingQueue indexingQueue, final DirectoryManager directoryManager, boolean
readOnly) throws IOException
+ {
+ super(analyzer, similarity, directoryManager.getDirectory("chunk_" + id),
cache, indexingQueue);
+ this.id = id;
+ this.readOnly = readOnly;
+ }
+
+ @Override
+ void addDocuments(Document[] docs) throws IOException
+ {
+ if (readOnly)
+ {
+ throw new IOException("Index is not owned by current instance of ChunkIndex
and can't be modified!");
+ }
+ super.addDocuments(docs);
+ }
+
+ @Override
+ protected void commit() throws IOException
+ {
+ if (readOnly)
+ {
+ throw new IOException("Index is not owned by current instance of ChunkIndex
and can't be modified!");
+ }
+ super.commit();
+ }
+
+ @Override
+ protected synchronized void commit(boolean optimize) throws IOException
+ {
+ if (readOnly)
+ {
+ throw new IOException("Index is not owned by current instance of ChunkIndex
and can't be modified!");
+ }
+ super.commit(optimize);
+ }
+
+ @Override
+ protected synchronized IndexWriter getIndexWriter() throws IOException
+ {
+ if (readOnly)
+ {
+ throw new IOException("Index is not owned by current instance of ChunkIndex
and can't be modified!");
+ }
+ return super.getIndexWriter();
+ }
+
+ @Override
+ int removeDocument(Term idTerm) throws IOException
+ {
+ if (readOnly)
+ {
+ throw new IOException("Index is not owned by current instance of ChunkIndex
and can't be modified!");
+ }
+ return super.removeDocument(idTerm);
+ }
+
+ /**
+ * Returns the name of this index.
+ * @return the name of this index.
+ */
+ int getId()
+ {
+ return id;
+ }
+
+ public void setReadOnly(boolean readOnly)
+ {
+ this.readOnly = readOnly;
+ if (readOnly)
+ {
+ releaseWriterAndReaders();
+ }
+ }
+}
Property changes on:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/PersistentIndexChunk.java
___________________________________________________________________
Name: svn:mime-type
+ text/plain
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -570,7 +570,7 @@
indexingConfig = createIndexingConfiguration(nsMappings);
analyzer.setIndexingConfig(indexingConfig);
- index = new ChunkIndex(this, context.getIndexingTree());
+ index = new ChunkIndex(this, context.getIndexingTree(), modeHandler,
chunkService);
// if RW mode, create initial index and start check
if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
{
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jbosscache/AbstractWriteOnlyCacheLoader.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jbosscache/AbstractWriteOnlyCacheLoader.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jbosscache/AbstractWriteOnlyCacheLoader.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -74,6 +74,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void prepare(Object tx, List<Modification> modifications, boolean
onePhase) throws Exception
{
throw new WriteOnlyCacheLoaderException("The method 'prepare(Object tx,
List<Modification> modifications, boolean onePhase)' should not be
called.");
@@ -84,7 +85,8 @@
*/
public Object remove(Fqn arg0, Object arg1) throws Exception
{
- throw new WriteOnlyCacheLoaderException("The method 'remove(Fqn arg0,
Object arg1)' should not be called.");
+// throw new WriteOnlyCacheLoaderException("The method 'remove(Fqn arg0,
Object arg1)' should not be called.");
+ return null;
}
/**
@@ -106,6 +108,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public abstract void put(List<Modification> modifications) throws Exception ;
}
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/util/io/PrivilegedCacheHelper.java
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/util/io/PrivilegedCacheHelper.java 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/util/io/PrivilegedCacheHelper.java 2010-11-02
07:25:50 UTC (rev 3373)
@@ -27,6 +27,7 @@
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
+import java.util.Map;
/**
* @author <a href="anatoliy.bazko(a)exoplatform.org">Anatoliy
Bazko</a>
@@ -163,4 +164,42 @@
}
}
}
+
+ /**
+ * Put in cache in privileged mode.
+ *
+ * @param cache
+ */
+ public static void put(final Cache<Serializable, Object> cache, final Fqn fqn,
final Map<Serializable, Object> data)
+ throws CacheException
+ {
+ PrivilegedExceptionAction<Object> action = new
PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
+ {
+ cache.put(fqn, data);
+ return null;
+ }
+ };
+ try
+ {
+ AccessController.doPrivileged(action);
+ }
+ catch (PrivilegedActionException pae)
+ {
+ Throwable cause = pae.getCause();
+ if (cause instanceof CacheException)
+ {
+ throw (CacheException)cause;
+ }
+ else if (cause instanceof RuntimeException)
+ {
+ throw (RuntimeException)cause;
+ }
+ else
+ {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
}
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jbosscache-indexer.xml
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jbosscache-indexer.xml 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jbosscache-indexer.xml 2010-11-02
07:25:50 UTC (rev 3373)
@@ -4,18 +4,18 @@
<locking useLockStriping="false" concurrencyLevel="500"
lockParentForChildInsertRemove="false"
lockAcquisitionTimeout="20000" />
<!-- Configure the TransactionManager -->
- <transaction
transactionManagerLookupClass="org.jboss.cache.transaction.JBossStandaloneJTAManagerLookup"
/>
+ <!-- transaction
transactionManagerLookupClass="org.jboss.cache.transaction.JBossStandaloneJTAManagerLookup"
/-->
<clustering mode="replication"
clusterName="${jbosscache-cluster-name}">
- <stateRetrieval timeout="20000" fetchInMemoryState="false"
/>
- <sync />
+ <stateRetrieval timeout="20000" fetchInMemoryState="true"
/>
+ <sync replTimeout = "30000"/>
</clustering>
<!-- Eviction configuration -->
- <eviction wakeUpInterval="5000">
+ <!--eviction wakeUpInterval="5000">
<default algorithmClass="org.jboss.cache.eviction.FIFOAlgorithm"
eventQueueSize="1000000">
<property name="maxNodes" value="10000" />
<property name="minTimeToLive" value="60000" />
</default>
- </eviction>
+ </eviction-->
</jbosscache>
Modified:
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jcr-config.xml
===================================================================
---
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jcr-config.xml 2010-11-01
10:50:31 UTC (rev 3372)
+++
jcr/branches/1.14-CNK/exo.jcr.component.core/src/test/resources/conf/standalone/cluster/test-jcr-config.xml 2010-11-02
07:25:50 UTC (rev 3373)
@@ -59,7 +59,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1/ws" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -123,7 +123,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1/ws1" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -197,7 +197,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1/ws2" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -285,7 +285,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1/ws3" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -378,7 +378,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1tck/ws" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -459,7 +459,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1tck/ws1" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />
@@ -538,7 +538,7 @@
<properties>
<property name="index-dir"
value="target/temp/index/db1tck/ws2" />
<property name="changesfilter-class"
-
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.JBossCacheIndexChangesFilter"
/>
+
value="org.exoplatform.services.jcr.impl.core.query.jbosscache.chunk.JBossCacheChunkIndexChangesFilter"
/>
<property name="jbosscache-configuration"
value="test-jbosscache-indexer.xml" />
<property name="jgroups-configuration"
value="udp-mux.xml" />
<property name="jgroups-multiplexer-stack"
value="false" />