Author: nzamosenchuk
Date: 2011-02-16 05:02:16 -0500 (Wed, 16 Feb 2011)
New Revision: 3980
Added:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexCacheLoader.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChangesHolder.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/core/query/lucene/TestChangesHolder.java
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/ChangesFilterListsWrapper.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerCacheLoader.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SingletonTokenStream.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
Log:
EXOJCR-1080 : Merging work done in IMPR branch back to trunk. This contains:
a) Each cluster node has it's own volatile index
b) Lucene Documents are replicated via cluster instead of UUIDs
c) Set of classes for LocalIndexes on each cluster node (LocalIndexChangesFilter,
LocalIndexCacheLoader).
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -109,9 +109,6 @@
*/
public void onSaveItems(ItemStateChangesLog itemStates)
{
-
- long time = System.currentTimeMillis();
-
// nodes that need to be removed from the index.
final Set<String> removedNodes = new HashSet<String>();
// nodes that need to be added to the index.
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -21,6 +21,7 @@
import org.exoplatform.services.jcr.datamodel.NodeData;
import org.exoplatform.services.jcr.impl.core.SessionDataManager;
import org.exoplatform.services.jcr.impl.core.SessionImpl;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexInfos;
import org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitor;
import org.exoplatform.services.jcr.impl.core.query.lucene.MultiIndex;
@@ -79,6 +80,23 @@
void updateNodes(Iterator<String> remove, Iterator<NodeData> add) throws
RepositoryException, IOException;
/**
+ * Extracts all the changes and returns them as a {@link ChangesHolder} instance
+ * @param remove Iterator of <code>NodeIds</code> of nodes to delete
+ * @param add Iterator of <code>NodeState</code> instance to add to
the
+ * index.
+ * @return a {@link ChangesHolder} instance that contains all the changes
+ */
+ ChangesHolder getChanges(Iterator<String> remove, Iterator<NodeData>
add);
+
+ /**
+ * Applies the given changes to the indes in an atomic operation
+ * @param changes the changes to apply
+ * @throws RepositoryException if an error occurs while indexing a node.
+ * @throws IOException if an error occurs while updating the index.
+ */
+ void apply(ChangesHolder changes) throws RepositoryException, IOException;
+
+ /**
* Closes this <code>QueryHandler</code> and frees resources attached
* to this handler.
*/
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -54,6 +54,7 @@
import org.exoplatform.services.jcr.impl.core.NamespaceRegistryImpl;
import org.exoplatform.services.jcr.impl.core.SessionDataManager;
import org.exoplatform.services.jcr.impl.core.SessionImpl;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import org.exoplatform.services.jcr.impl.core.query.lucene.FieldNames;
import org.exoplatform.services.jcr.impl.core.query.lucene.LuceneVirtualTableResolver;
import org.exoplatform.services.jcr.impl.core.query.lucene.QueryHits;
@@ -600,6 +601,23 @@
public void updateIndex(final Set<String> removedNodes, final Set<String>
addedNodes) throws RepositoryException,
IOException
{
+ final ChangesHolder changes = getChanges(removedNodes, addedNodes);
+ apply(changes);
+ }
+
+ public void apply(ChangesHolder changes) throws RepositoryException, IOException
+ {
+ if (handler != null && changes != null &&
(!changes.getAdd().isEmpty() || !changes.getRemove().isEmpty()))
+ {
+ handler.apply(changes);
+ }
+ }
+
+ /**
+ * Extracts all the changes and returns them as a {@link ChangesHolder} instance
+ */
+ public ChangesHolder getChanges(final Set<String> removedNodes, final
Set<String> addedNodes)
+ {
if (handler != null)
{
Iterator<NodeData> addedStates = new Iterator<NodeData>()
@@ -680,12 +698,12 @@
if (removedNodes.size() > 0 || addedNodes.size() > 0)
{
- handler.updateNodes(removedIds, addedStates);
+ return handler.getChanges(removedIds, addedStates);
}
}
-
+ return null;
}
-
+
protected QueryHandlerContext createQueryHandlerContext(QueryHandler parentHandler)
throws RepositoryConfigurationException
{
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/ChangesFilterListsWrapper.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/ChangesFilterListsWrapper.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/ChangesFilterListsWrapper.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -18,6 +18,8 @@
*/
package org.exoplatform.services.jcr.impl.core.query.jbosscache;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
+
import java.io.Serializable;
import java.util.Set;
@@ -40,6 +42,10 @@
private Set<String> parentRemovedNodes;
+ private ChangesHolder changes;
+
+ private ChangesHolder parentChanges;
+
/**
* Creates ChangesFilterListsWrapper data class, containing given lists.
*
@@ -56,7 +62,31 @@
this.parentAddedNodes = parentAddedNodes;
this.parentRemovedNodes = parentRemovedNodes;
}
+
+ /**
+ * Creates ChangesFilterListsWrapper data class, containing given lists.
+ */
+ public ChangesFilterListsWrapper(ChangesHolder changes, ChangesHolder parentChanges)
+ {
+ this.changes = changes;
+ this.parentChanges = parentChanges;
+ }
+ public boolean withChanges()
+ {
+ return changes != null || parentChanges != null;
+ }
+
+ public ChangesHolder getChanges()
+ {
+ return changes;
+ }
+
+ public ChangesHolder getParentChanges()
+ {
+ return parentChanges;
+ }
+
public Set<String> getAddedNodes()
{
return addedNodes;
@@ -76,15 +106,4 @@
{
return parentRemovedNodes;
}
-
- public String dump()
- {
- StringBuffer buffer = new StringBuffer();
- buffer.append("\n");
-
buffer.append("Added=").append(addedNodes.toString()).append("\n");
-
buffer.append("Removed=").append(removedNodes.toString()).append("\n");
-
buffer.append("ParentAdded=").append(parentAddedNodes.toString()).append("\n");
- buffer.append("ParentRemoved=").append(parentRemovedNodes.toString());
- return buffer.toString();
- }
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerCacheLoader.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerCacheLoader.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerCacheLoader.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -21,6 +21,7 @@
import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import
org.exoplatform.services.jcr.impl.storage.jbosscache.AbstractWriteOnlyCacheLoader;
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
@@ -30,6 +31,7 @@
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -50,7 +52,7 @@
*/
private final Map<Fqn<String>, Indexer> indexers = new
HashMap<Fqn<String>, Indexer>();
- private volatile IndexerIoModeHandler modeHandler;
+ protected volatile IndexerIoModeHandler modeHandler;
/**
* @see org.jboss.cache.loader.AbstractCacheLoader#commit(java.lang.Object)
@@ -60,7 +62,7 @@
{
// do nothing. Everything is done on prepare phase.
}
-
+
/**
* This method will register a new Indexer according to the given parameters.
*
@@ -75,7 +77,10 @@
{
indexers.put(Fqn.fromElements(searchManager.getWsId()), new Indexer(searchManager,
parentSearchManager, handler,
parentHandler));
- if (log.isDebugEnabled()) log.debug("Register " + searchManager.getWsId()
+ " " + this + " in " + indexers);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Register " + searchManager.getWsId() + " " + this
+ " in " + indexers);
+ }
}
/**
@@ -97,18 +102,28 @@
if (indexer == null)
{
log.warn("No indexer could be found for the fqn " +
name.getParent());
- if (log.isDebugEnabled()) log.debug("The current content of the map
of indexers is " + indexers);
+ if (log.isDebugEnabled())
+ {
+ log.debug("The current content of the map of indexers is " +
indexers);
+ }
}
+ else if (wrapper.withChanges())
+ {
+ indexer.updateIndex(wrapper.getChanges(), wrapper.getParentChanges());
+ }
else
{
- indexer.updateIndex(wrapper.getAddedNodes(), wrapper.getRemovedNodes(),
wrapper.getParentAddedNodes(), wrapper
- .getParentRemovedNodes());
+ indexer.updateIndex(wrapper.getAddedNodes(), wrapper.getRemovedNodes(),
wrapper.getParentAddedNodes(),
+ wrapper.getParentRemovedNodes());
}
}
finally
{
- // remove the data from the cache
- cache.removeNode(name);
+ if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+ {
+ // remove the data from the cache
+ cache.removeNode(name);
+ }
}
}
return null;
@@ -190,7 +205,7 @@
private final QueryHandler handler;
private final QueryHandler parentHandler;
-
+
public Indexer(SearchManager searchManager, SearchManager parentSearchManager,
QueryHandler handler,
QueryHandler parentHandler) throws RepositoryConfigurationException
{
@@ -198,7 +213,8 @@
this.parentSearchManager = parentSearchManager;
this.handler = handler;
this.parentHandler = parentHandler;
- }
+ }
+
/**
* Flushes lists of added/removed nodes to SearchManagers, starting indexing.
*
@@ -250,7 +266,7 @@
log.error("Error indexing changes " + e, e);
try
{
- parentHandler.logErrorChanges(removedNodes, addedNodes);
+ parentHandler.logErrorChanges(parentRemovedNodes, parentAddedNodes);
}
catch (IOException ioe)
{
@@ -258,6 +274,63 @@
}
}
}
- }
+ }
+
+ /**
+ * Flushes lists of added/removed nodes to SearchManagers, starting indexing.
+ */
+ protected void updateIndex(ChangesHolder changes, ChangesHolder parentChanges)
+ {
+ // pass lists to search manager
+ if (searchManager != null && changes != null)
+ {
+ try
+ {
+ searchManager.apply(changes);
+ }
+ catch (RepositoryException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ }
+ catch (IOException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ try
+ {
+ handler.logErrorChanges(new HashSet<String>(changes.getRemove()),
new HashSet<String>(changes
+ .getAddIds()));
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not
complete. " + ioe, ioe);
+ }
+ }
+ }
+ // pass lists to parent search manager
+ if (parentSearchManager != null && parentChanges != null)
+ {
+ try
+ {
+ parentSearchManager.apply(parentChanges);
+ }
+ catch (RepositoryException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ }
+ catch (IOException e)
+ {
+ log.error("Error indexing changes " + e, e);
+ try
+ {
+ parentHandler.logErrorChanges(new
HashSet<String>(parentChanges.getRemove()), new HashSet<String>(
+ parentChanges.getAddIds()));
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not
complete. " + ioe, ioe);
+ }
+ }
+ }
+ }
}
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -79,28 +79,46 @@
{
Fqn<?> fqn = aChildren.getFqn();
Object value = cache.get(fqn,
JBossCacheIndexChangesFilter.LISTWRAPPER);
- if (value != null && value instanceof
ChangesFilterListsWrapper)
+ if (value instanceof ChangesFilterListsWrapper)
{
// get wrapper object
ChangesFilterListsWrapper listsWrapper =
(ChangesFilterListsWrapper)value;
- // get search manager lists
- addedNodes.addAll(listsWrapper.getAddedNodes());
- removedNodes.addAll(listsWrapper.getRemovedNodes());
- // parent search manager lists
- parentAddedNodes.addAll(listsWrapper.getParentAddedNodes());
- parentRemovedNodes.addAll(listsWrapper.getParentAddedNodes());
- }
+ if (listsWrapper.withChanges())
+ {
+ if (listsWrapper.getChanges() != null)
+ {
+ // get search manager lists
+ addedNodes.addAll(listsWrapper.getChanges().getAddIds());
+ removedNodes.addAll(listsWrapper.getChanges().getRemove());
+ }
+ if (listsWrapper.getParentChanges() != null)
+ {
+ // parent search manager lists
+
parentAddedNodes.addAll(listsWrapper.getParentChanges().getAddIds());
+
parentRemovedNodes.addAll(listsWrapper.getParentChanges().getRemove());
+ }
+ }
+ else
+ {
+ // get search manager lists
+ addedNodes.addAll(listsWrapper.getAddedNodes());
+ removedNodes.addAll(listsWrapper.getRemovedNodes());
+ // parent search manager lists
+ parentAddedNodes.addAll(listsWrapper.getParentAddedNodes());
+ parentRemovedNodes.addAll(listsWrapper.getParentAddedNodes());
+ }
+ }
}
String id = IdGenerator.generate();
- cache.put(Fqn.fromRelativeElements(wsChildren.getFqn(), id),
JBossCacheIndexChangesFilter.LISTWRAPPER, new ChangesFilterListsWrapper(addedNodes,
- removedNodes, parentAddedNodes, parentRemovedNodes));
+ cache.put(Fqn.fromRelativeElements(wsChildren.getFqn(), id),
JBossCacheIndexChangesFilter.LISTWRAPPER,
+ new ChangesFilterListsWrapper(addedNodes, removedNodes,
parentAddedNodes, parentRemovedNodes));
// Once we put the merged changes into the cache we can remove other
changes from the cache
- for (NodeSPI aChildren : children)
- {
+ for (NodeSPI aChildren : children)
+ {
// Remove the node from the cache and do it asynchronously
-
cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
- cache.removeNode(aChildren.getFqn());
- }
+
cache.getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
+ cache.removeNode(aChildren.getFqn());
+ }
}
if (debugEnabled)
{
@@ -111,6 +129,15 @@
};
}
+ @Override
+ public Object put(Fqn name, Object key, Object value) throws Exception
+ {
+ // delegating call to underlying cache loader, skipping SingletonStore cache
loader.
+ // this is used to deliver lists to non-coordinator nodes, since the do the
indexing into
+ // volatile index
+ return getCacheLoader().put(name, key, value);
+ }
+
/**
* Sets/changes indexer mode
*
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -27,6 +27,7 @@
import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory;
import org.exoplatform.services.jcr.jbosscache.PrivilegedJBossCacheHelper;
import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory.CacheType;
@@ -189,11 +190,18 @@
protected void doUpdateIndex(Set<String> removedNodes, Set<String>
addedNodes, Set<String> parentRemovedNodes,
Set<String> parentAddedNodes)
{
+ ChangesHolder changes = searchManager.getChanges(removedNodes, addedNodes);
+ ChangesHolder parentChanges = parentSearchManager.getChanges(parentRemovedNodes,
parentAddedNodes);
+
+ if (changes == null && parentChanges == null)
+ {
+ return;
+ }
String id = IdGenerator.generate();
try
{
PrivilegedJBossCacheHelper.put(cache, Fqn.fromRelativeElements(rootFqn, id),
LISTWRAPPER,
- new ChangesFilterListsWrapper(addedNodes, removedNodes, parentAddedNodes,
parentRemovedNodes));
+ new ChangesFilterListsWrapper(changes, parentChanges));
}
catch (CacheException e)
{
Added:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexCacheLoader.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexCacheLoader.java
(rev 0)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexCacheLoader.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache;
+
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoMode;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+
+/**
+ * This cache loader replaces Indexer IO Mode handling with constant ReadWrite state.
+ * This is required for indexing in cluster, when each instance has it's own index
stack,
+ * having local FileSystem with LuceneDirectories.
+ *
+ * @author <a href="mailto:nikolazius@gmail.com">Nikolay
Zamosenchuk</a>
+ * @version $Id: LocalIndexCacheLoader.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+public class LocalIndexCacheLoader extends IndexerCacheLoader
+{
+ public LocalIndexCacheLoader()
+ {
+ super();
+ modeHandler = new IndexerIoModeHandler(IndexerIoMode.READ_WRITE); // initialize
mode handler
+ }
+
+ @Override
+ IndexerIoModeHandler getModeHandler()
+ {
+ return modeHandler;
+ }
+
+ @Override
+ void setMode(IndexerIoMode ioMode)
+ {
+ // can't set RO on this cache loader
+ if (ioMode == IndexerIoMode.READ_ONLY)
+ {
+ throw new UnsupportedOperationException(
+ "Can't set ReadOnly on this type of CacheLoader. It is designed to
provide local index for each cluster instance. Make sure you are using Index
properly.");
+ }
+ }
+
+}
Added:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexChangesFilter.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexChangesFilter.java
(rev 0)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/LocalIndexChangesFilter.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -0,0 +1,196 @@
+/*
+ * Copyright (C) 2009 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.jbosscache;
+
+import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.services.jcr.config.QueryHandlerEntry;
+import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter;
+import org.exoplatform.services.jcr.impl.core.query.IndexerIoModeHandler;
+import org.exoplatform.services.jcr.impl.core.query.IndexingTree;
+import org.exoplatform.services.jcr.impl.core.query.QueryHandler;
+import org.exoplatform.services.jcr.impl.core.query.SearchManager;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
+import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory;
+import org.exoplatform.services.jcr.jbosscache.PrivilegedJBossCacheHelper;
+import org.exoplatform.services.jcr.jbosscache.ExoJBossCacheFactory.CacheType;
+import org.exoplatform.services.jcr.util.IdGenerator;
+import org.exoplatform.services.log.ExoLogger;
+import org.exoplatform.services.log.Log;
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+
+import javax.jcr.RepositoryException;
+
+/**
+ * This type of ChangeFilter offers an ability for each cluster instance to have own
+ * local index (stack of indexes, from persistent to volatile). It uses JBossCache for
+ * Lucene Documents and UUIDs delivery. Each node works in ReadWrite mode, so manages
+ * it own volatile, merger, local list of persisted indexes and stand-alone
+ * UpdateInProgressMonitor implementation.
+ * This implementation is similar to JBossCacheIndexChangesFilter but it doesn't use
+ * SingletonStoreCacheLoader tier and cluster-aware implementations of IndexInfos
+ * and UpdateInProgressMonitor.
+ *
+ * @author <a href="mailto:Sergey.Kabashnyuk@exoplatform.org">Sergey
Kabashnyuk</a>
+ * @version $Id: exo-jboss-codetemplates.xml 34360 2009-07-22 23:58:59Z ksm $
+ *
+ */
+public class LocalIndexChangesFilter extends IndexerChangesFilter
+{
+ /**
+ * Logger instance for this class
+ */
+ private final Log log =
ExoLogger.getLogger("exo.jcr.component.core.JBossCacheIndexChangesFilter");
+
+ public static final String PARAM_JBOSSCACHE_CONFIGURATION =
"jbosscache-configuration";
+
+ public static final String PARAM_JBOSSCACHE_PUSHSTATE =
"jbosscache-sscl-push.state.enabled";
+
+ public static final String PARAM_JBOSSCACHE_PUSHSTATE_TIMEOUT =
"jbosscache-sscl-push.state.timeout";
+
+ /**
+ * Indicate whether the JBoss Cache instance used can be shared with other caches
+ */
+ public static final String PARAM_JBOSSCACHE_SHAREABLE =
"jbosscache-shareable";
+
+ public static final Boolean PARAM_JBOSSCACHE_SHAREABLE_DEFAULT = Boolean.FALSE;
+
+ private final Cache<Serializable, Object> cache;
+
+ private final Fqn<String> rootFqn;
+
+ public static final String LISTWRAPPER = "$lists".intern();
+
+ /**
+ * @param searchManager
+ * @param config
+ * @param indexingTree
+ * @throws RepositoryConfigurationException
+ */
+ public LocalIndexChangesFilter(SearchManager searchManager, SearchManager
parentSearchManager,
+ QueryHandlerEntry config, IndexingTree indexingTree, IndexingTree
parentIndexingTree, QueryHandler handler,
+ QueryHandler parentHandler, ConfigurationManager cfm) throws IOException,
RepositoryException,
+ RepositoryConfigurationException
+ {
+ super(searchManager, parentSearchManager, config, indexingTree, parentIndexingTree,
handler, parentHandler, cfm);
+ // create cache using custom factory
+ ExoJBossCacheFactory<Serializable, Object> factory = new
ExoJBossCacheFactory<Serializable, Object>(cfm);
+ Cache<Serializable, Object> initCache = factory.createCache(config);
+
+ // initialize IndexerCacheLoader
+ IndexerCacheLoader indexerCacheLoader = new LocalIndexCacheLoader();
+
+ // create CacheLoaderConfig
+ IndividualCacheLoaderConfig individualCacheLoaderConfig = new
IndividualCacheLoaderConfig();
+ // set CacheLoader
+ individualCacheLoaderConfig.setCacheLoader(indexerCacheLoader);
+ // set parameters
+ individualCacheLoaderConfig.setFetchPersistentState(false);
+ individualCacheLoaderConfig.setAsync(false);
+ individualCacheLoaderConfig.setIgnoreModifications(false);
+ individualCacheLoaderConfig.setPurgeOnStartup(false);
+ // create CacheLoaderConfig
+ CacheLoaderConfig cacheLoaderConfig = new CacheLoaderConfig();
+ cacheLoaderConfig.setShared(false);
+ cacheLoaderConfig.setPassivation(false);
+ cacheLoaderConfig.addIndividualCacheLoaderConfig(individualCacheLoaderConfig);
+ // insert CacheLoaderConfig
+ initCache.getConfiguration().setCacheLoaderConfig(cacheLoaderConfig);
+ this.rootFqn = Fqn.fromElements(searchManager.getWsId());
+ this.cache =
+ ExoJBossCacheFactory.getUniqueInstance(CacheType.INDEX_CACHE, rootFqn,
initCache, config.getParameterBoolean(
+ PARAM_JBOSSCACHE_SHAREABLE, PARAM_JBOSSCACHE_SHAREABLE_DEFAULT));
+
+ PrivilegedJBossCacheHelper.create(cache);
+ PrivilegedJBossCacheHelper.start(cache);
+
+ indexerCacheLoader =
(IndexerCacheLoader)((CacheSPI)cache).getCacheLoaderManager().getCacheLoader();
+
+ indexerCacheLoader.register(searchManager, parentSearchManager, handler,
parentHandler);
+ IndexerIoModeHandler modeHandler = indexerCacheLoader.getModeHandler();
+ handler.setIndexerIoModeHandler(modeHandler);
+ parentHandler.setIndexerIoModeHandler(modeHandler);
+
+ // using default updateMonitor and default
+ if (!parentHandler.isInitialized())
+ {
+ parentHandler.init();
+ }
+ if (!handler.isInitialized())
+ {
+ handler.init();
+ }
+ }
+
+ /**
+ * @see
org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter#doUpdateIndex(java.util.Set,
java.util.Set, java.util.Set, java.util.Set)
+ */
+ @Override
+ protected void doUpdateIndex(Set<String> removedNodes, Set<String>
addedNodes, Set<String> parentRemovedNodes,
+ Set<String> parentAddedNodes)
+ {
+
+ ChangesHolder changes = searchManager.getChanges(removedNodes, addedNodes);
+ ChangesHolder parentChanges = parentSearchManager.getChanges(parentRemovedNodes,
parentAddedNodes);
+
+ if (changes == null && parentChanges == null)
+ {
+ return;
+ }
+ String id = IdGenerator.generate();
+ try
+ {
+ PrivilegedJBossCacheHelper.put(cache, Fqn.fromRelativeElements(rootFqn, id),
LISTWRAPPER,
+ new ChangesFilterListsWrapper(changes, parentChanges));
+ }
+ catch (CacheException e)
+ {
+ log.error(e.getLocalizedMessage(), e);
+ logErrorChanges(handler, removedNodes, addedNodes);
+ logErrorChanges(parentHandler, parentRemovedNodes, parentAddedNodes);
+ }
+ }
+
+ /**
+ * Log errors
+ * @param logHandler
+ * @param removedNodes
+ * @param addedNodes
+ */
+ private void logErrorChanges(QueryHandler logHandler, Set<String> removedNodes,
Set<String> addedNodes)
+ {
+ try
+ {
+ logHandler.logErrorChanges(addedNodes, removedNodes);
+ }
+ catch (IOException ioe)
+ {
+ log.warn("Exception occure when errorLog writed. Error log is not complete.
" + ioe, ioe);
+ }
+ }
+}
Added:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChangesHolder.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChangesHolder.java
(rev 0)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/ChangesHolder.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -0,0 +1,399 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.lucene;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Fieldable;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This class is used to serialize and deserialize the changes to apply to the lucene
index.
+ *
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ */
+public class ChangesHolder implements Externalizable
+{
+
+ private static final int STORED_FLAG = 1;
+
+ private static final int COMPRESSED_FLAG = 1 << 1;
+
+ private static final int INDEXED_FLAG = 1 << 2;
+
+ private static final int TOKENIZED_FLAG = 1 << 3;
+
+ private static final int OMIT_NORMS_FLAG = 1 << 4;
+
+ private static final int BINARY_FLAG = 1 << 5;
+
+ private static final int STORE_TERM_VECTOR_FLAG = 1 << 6;
+
+ private static final int STORE_POSITION_WITH_TERM_VECTOR_FLAG = 1 << 7;
+
+ private static final int STORE_OFFSET_WITH_TERM_VECTOR_FLAG = 1 << 8;
+
+ private static final int LAZY_FLAG = 1 << 9;
+
+ private static final int OMIT_TF_FLAG = 1 << 10;
+
+ private static final int BOOST_FLAG = 1 << 11;
+
+ /**
+ * List of doc ids to remove from the index
+ */
+ private List<String> remove;
+
+ /**
+ * Collection of Lucene Documents to add to the index
+ */
+ private Collection<Document> add;
+
+ /**
+ * Default constructor used during the deserializing phase
+ */
+ public ChangesHolder()
+ {
+ }
+
+ /**
+ * @param remove Collection of doc ids to remove from the index
+ * @param add Lucene Documents to add to the index
+ */
+ public ChangesHolder(Collection<String> remove, Collection<Document> add)
+ {
+ this.remove = new ArrayList<String>(remove);
+ this.add = add;
+ }
+
+ /**
+ * @return the collection of doc id to remove
+ */
+ public Collection<String> getRemove()
+ {
+ return remove;
+ }
+
+ /**
+ * @return the collection of lucene document to add
+ */
+ public Collection<Document> getAdd()
+ {
+ return add;
+ }
+
+ /**
+ * @return the collection of id of lucene document to add
+ */
+ public Collection<String> getAddIds()
+ {
+ Collection<String> ids = new LinkedList<String>();
+ for (Document doc : add)
+ {
+ ids.add(getDocId(doc));
+ }
+ return ids;
+ }
+
+ /**
+ * @return the id of the given lucene doc
+ */
+ public String getDocId(Document doc)
+ {
+ return doc.get(FieldNames.UUID);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ int length = in.readInt();
+ this.remove = new ArrayList<String>(length);
+ for (int i = 0; i < length; i++)
+ {
+ remove.add(in.readUTF());
+ }
+ this.add = new LinkedList<Document>();
+ while (in.readBoolean())
+ {
+ Document doc = new Document();
+ doc.setBoost(in.readFloat());
+ int l = in.readInt();
+ for (int i = 0; i < l; i++)
+ {
+ doc.add(readField(in, doc));
+ }
+ add.add(doc);
+ }
+ }
+
+ /**
+ * Deserialize the field from the given {@link ObjectInput}
+ * @param in the stream from which we deserialize the Field
+ * @return the deserialized field
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private static Field readField(ObjectInput in, Document doc) throws IOException,
ClassNotFoundException
+ {
+ String name = in.readUTF();
+ int flags = in.readInt();
+ float boost = (flags & BOOST_FLAG) > 0 ? in.readFloat() : 1.0f;
+ Object value = in.readObject();
+ Field field;
+ if (value instanceof TokenStream)
+ {
+ field = new Field(name, (TokenStream)value, getTermVectorParameter(flags));
+ }
+ else
+ {
+ // The value is a String
+ field = new Field(name, (String)value, getStoreParameter(flags),
getIndexParameter(flags), getTermVectorParameter(flags));
+ }
+ field.setBoost(boost);
+ field.setOmitNorms((flags & OMIT_NORMS_FLAG) > 0);
+ field.setOmitTf((flags & OMIT_TF_FLAG) > 0);
+ return field;
+ }
+
+ /**
+ * Returns the index parameter extracted from the flags.
+ *
+ * @param flags the flags of the Lucene field.
+ * @return the index parameter corresponding to the given flags.
+ */
+ private static Field.Index getIndexParameter(int flags)
+ {
+ if ((flags & INDEXED_FLAG) == 0)
+ {
+ return Field.Index.NO;
+ }
+ else if ((flags & TOKENIZED_FLAG) > 0)
+ {
+ return Field.Index.ANALYZED;
+ }
+ else
+ {
+ return Field.Index.NOT_ANALYZED;
+ }
+ }
+
+ /**
+ * Returns the store parameter extracted from the flags.
+ *
+ * @param flags the flags of the Lucene field.
+ * @return the store parameter corresponding to the given flags.
+ */
+ private static Field.Store getStoreParameter(int flags)
+ {
+ if ((flags & COMPRESSED_FLAG) > 0)
+ {
+ return Field.Store.COMPRESS;
+ }
+ else if ((flags & STORED_FLAG) > 0)
+ {
+ return Field.Store.YES;
+ }
+ else
+ {
+ return Field.Store.NO;
+ }
+ }
+
+ /**
+ * Returns the term vector parameter extracted from the flags.
+ *
+ * @param flags the flags of the Lucene field.
+ * @return the term vector parameter corresponding to the given flags.
+ */
+ private static Field.TermVector getTermVectorParameter(int flags)
+ {
+ if (((flags & STORE_POSITION_WITH_TERM_VECTOR_FLAG) > 0)
+ && ((flags & STORE_OFFSET_WITH_TERM_VECTOR_FLAG) > 0))
+ {
+ return Field.TermVector.WITH_POSITIONS_OFFSETS;
+ }
+ else if ((flags & STORE_POSITION_WITH_TERM_VECTOR_FLAG) > 0)
+ {
+ return Field.TermVector.WITH_POSITIONS;
+ }
+ else if ((flags & STORE_OFFSET_WITH_TERM_VECTOR_FLAG) > 0)
+ {
+ return Field.TermVector.WITH_OFFSETS;
+ }
+ else if ((flags & STORE_TERM_VECTOR_FLAG) > 0)
+ {
+ return Field.TermVector.YES;
+ }
+ else
+ {
+ return Field.TermVector.NO;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("unchecked")
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ int length = remove.size();
+ out.writeInt(length);
+ for (int i = 0; i < length; i++)
+ {
+ out.writeUTF(remove.get(i));
+ }
+ out.flush();
+ for (Document doc : add)
+ {
+ // Indicate that there is a doc to come
+ out.writeBoolean(true);
+ // boost
+ out.writeFloat(doc.getBoost());
+ List<Fieldable> fields = doc.getFields();
+ int l = fields.size();
+ out.writeInt(l);
+ for (int i = 0; i < l; i++)
+ {
+ writeField(out, fields.get(i));
+ }
+ out.flush();
+ }
+ // There is no doc anymore
+ out.writeBoolean(false);
+ }
+
+ /**
+ * Serialize the Field into the given {@link ObjectOutput}
+ * @param out the stream in which we serialize the Field
+ * @param field the Field instance to serialize
+ * @throws IOException if the Field could not be serialized
+ */
+ private static void writeField(ObjectOutput out, Fieldable field) throws IOException
+ {
+ // Name
+ out.writeUTF(field.name());
+ // Flags
+ writeFlags(out, field);
+ if (field.getBoost() != 1.0f)
+ {
+ // Boost
+ out.writeFloat(field.getBoost());
+ }
+ // Value
+ writeValue(out, field);
+ }
+
+ /**
+ * Serialize the value into the given {@link ObjectOutput}
+ * @param out the stream in which we serialize the value
+ * @param field the field from which we extract the value
+ * @throws IOException if the value could not be serialized
+ */
+ private static void writeValue(ObjectOutput out, Fieldable field) throws IOException
+ {
+ Object o = field.stringValue();
+ if (o != null)
+ {
+ // Use writeObject instead of writeUTF because the value could contain
unsupported
+ // characters
+ out.writeObject(o);
+ return;
+ }
+ o = field.tokenStreamValue();
+ if (o != null)
+ {
+ out.writeObject(o);
+ return;
+ }
+ o = field.readerValue();
+ throw new RuntimeException("Unsupported value " + o);
+ }
+
+ /**
+ * Serialize the flags into the given {@link ObjectOutput}
+ * @param out the stream in which we serialize the flags
+ * @param field the field from which we extract the flags
+ * @throws IOException if the flags could not be serialized
+ */
+ private static void writeFlags(ObjectOutput out, Fieldable field) throws IOException
+ {
+ int flags = 0;
+ if (field.isStored())
+ {
+ flags |= STORED_FLAG;
+ }
+ if (field.isCompressed())
+ {
+ flags |= COMPRESSED_FLAG;
+ }
+ if (field.isIndexed())
+ {
+ flags |= INDEXED_FLAG;
+ }
+ if (field.isTokenized())
+ {
+ flags |= TOKENIZED_FLAG;
+ }
+ if (field.getOmitNorms())
+ {
+ flags |= OMIT_NORMS_FLAG;
+ }
+ if (field.isBinary())
+ {
+ flags |= BINARY_FLAG;
+ }
+ if (field.isTermVectorStored())
+ {
+ flags |= STORE_TERM_VECTOR_FLAG;
+ }
+ if (field.isStorePositionWithTermVector())
+ {
+ flags |= STORE_POSITION_WITH_TERM_VECTOR_FLAG;
+ }
+ if (field.isStoreOffsetWithTermVector())
+ {
+ flags |= STORE_OFFSET_WITH_TERM_VECTOR_FLAG;
+ }
+ if (field.isLazy())
+ {
+ flags |= LAZY_FLAG;
+ }
+ if (field.getOmitTf())
+ {
+ flags |= OMIT_TF_FLAG;
+ }
+ if (field.getBoost() != 1.0f)
+ {
+ flags |= BOOST_FLAG;
+ }
+ out.writeInt(flags);
+ }
+}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -19,6 +19,7 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
import org.apache.lucene.store.Directory;
import org.exoplatform.commons.utils.SecurityHelper;
import org.exoplatform.services.jcr.dataflow.ItemDataConsumer;
@@ -489,6 +490,25 @@
*/
synchronized void update(final Collection remove, final Collection add) throws
IOException
{
+ if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+ {
+ doUpdateRW(remove, add);
+ }
+ else
+ {
+ doUpdateRO(remove, add);
+ }
+ }
+
+ /**
+ * For investigation purposes only
+ *
+ * @param remove
+ * @param add
+ * @throws IOException
+ */
+ private void doUpdateRO(final Collection remove, final Collection add) throws
IOException
+ {
SecurityHelper.doPrivilegedIOExceptionAction(new
PrivilegedExceptionAction<Object>()
{
public Object run() throws Exception
@@ -506,7 +526,123 @@
log.warn("unable to prepare index reader " + "for
queries during update", e);
}
}
+ ReadOnlyIndexReader lastIndexReader = null;
+ try
+ {
+ for (Iterator it = remove.iterator(); it.hasNext();)
+ {
+ Term idTerm = new Term(FieldNames.UUID, (String)it.next());
+ volatileIndex.removeDocument(idTerm);
+ }
+ // try to avoid getting index reader for each doc
+ int lastIndexReaderId = indexes.size() - 1;
+ // check, index list can be empty
+ lastIndexReader =
+ (lastIndexReaderId >= 0) ?
((PersistentIndex)indexes.get(lastIndexReaderId)).getReadOnlyIndexReader()
+ : null;
+ for (Iterator it = add.iterator(); it.hasNext();)
+ {
+ Document doc = (Document)it.next();
+ if (doc != null)
+ {
+ // check if this item should be placed in own volatile index
+ // usually it must be indexed, but exception if it exists in
persisted index
+ boolean addDoc = true;
+
+ // make this check safe if something goes wrong
+ String uuid = doc.get(FieldNames.UUID);
+ // if remove contains uuid, node should be re-indexed
+ // if not, than should be checked if node present in the last
persisted index
+ if (!remove.contains(uuid))
+ {
+ // if index list changed, get the reader on the latest index
+ // or if index reader is not current
+ if (lastIndexReaderId != indexes.size() - 1
+ || (lastIndexReader != null &&
!lastIndexReader.isCurrent()))
+ {
+ // safe release reader
+ if (lastIndexReader != null)
+ {
+ lastIndexReader.release();
+ }
+ lastIndexReaderId = indexes.size() - 1;
+ lastIndexReader =
((PersistentIndex)indexes.get(lastIndexReaderId)).getReadOnlyIndexReader();
+ }
+ // if indexReader exists (it is possible that no persisted
indexes exists on start)
+ if (lastIndexReader != null)
+ {
+ try
+ {
+ // reader from resisted index should be
+ TermDocs termDocs = lastIndexReader.termDocs(new
Term(FieldNames.UUID, uuid));
+ // node should be indexed if not found in persistent index
+ addDoc = termDocs == null;
+ }
+ catch (Exception e)
+ {
+ log.debug("Some exception occured, during index
check");
+ }
+ }
+ }
+
+ if (addDoc)
+ {
+ volatileIndex.addDocuments(new Document[]{doc});
+ // reset volatile index if needed
+ if (volatileIndex.getRamSizeInBytes() >=
handler.getMaxVolatileIndexSize())
+ {
+ // to avoid out of memory
+ resetVolatileIndex();
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ // don't forget to release a reader anyway
+ if (lastIndexReader != null)
+ {
+ lastIndexReader.release();
+ }
+ synchronized (updateMonitor)
+ {
+ releaseMultiReader();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ /**
+ * For investigation purposes only
+ *
+ * @param remove
+ * @param add
+ * @throws IOException
+ */
+ private void doUpdateRW(final Collection remove, final Collection add) throws
IOException
+ {
+ SecurityHelper.doPrivilegedIOExceptionAction(new
PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
+ {
+ // make sure a reader is available during long updates
+ if (add.size() > handler.getBufferSize())
+ {
+ try
+ {
+ getIndexReader().release();
+ }
+ catch (IOException e)
+ {
+ // do not fail if an exception is thrown here
+ log.warn("unable to prepare index reader " + "for
queries during update", e);
+ }
+ }
+
synchronized (updateMonitor)
{
//updateInProgress = true;
@@ -2636,7 +2772,7 @@
// if the document cannot be deleted from the volatile index
// delete it from one of the persistent indexes.
int num = index.volatileIndex.removeDocument(idTerm);
- if (num == 0)
+ if (num == 0 && index.modeHandler.getMode() ==
IndexerIoMode.READ_WRITE)
{
for (int i = index.indexes.size() - 1; i >= 0; i--)
{
@@ -2946,6 +3082,10 @@
{
synchronized (updateMonitor)
{
+ // Coordinator set cluster-wide updateInProgress only in case of
persistent flush, which
+ // invokes volatile reset. So if RO cluster node received this
notification, it means that
+ // coordinator flushed volatile.
+ resetVolatileIndex();
updateMonitor.notifyAll();
releaseMultiReader();
}
@@ -3178,4 +3318,4 @@
}
}
}
-}
+}
\ No newline at end of file
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -738,11 +738,28 @@
IOException
{
checkOpen();
+ apply(getChanges(remove, add));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void apply(ChangesHolder changes) throws RepositoryException, IOException
+ {
+ checkOpen();
+ index.update(changes.getRemove(), changes.getAdd());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public ChangesHolder getChanges(Iterator<String> remove,
Iterator<NodeData> add)
+ {
final Map<String, NodeData> aggregateRoots = new HashMap<String,
NodeData>();
final Set<String> removedNodeIds = new HashSet<String>();
final Set<String> addedNodeIds = new HashSet<String>();
- index.update(IteratorUtils.toList(new TransformIterator(remove, new Transformer()
+ Collection<String> docIdsToRemove = IteratorUtils.toList(new
TransformIterator(remove, new Transformer()
{
public Object transform(Object input)
{
@@ -750,7 +767,8 @@
removedNodeIds.add(uuid);
return uuid;
}
- })), IteratorUtils.toList(new TransformIterator(add, new Transformer()
+ }));
+ Collection<Document> docsToAdd = IteratorUtils.toList(new
TransformIterator(add, new Transformer()
{
public Object transform(Object input)
{
@@ -775,7 +793,7 @@
}
return doc;
}
- })));
+ }));
// remove any aggregateRoot nodes that are new
// and therefore already up-to-date
@@ -805,8 +823,14 @@
}
});
modified.addAll(aggregateRoots.values());
- index.update(aggregateRoots.keySet(), modified);
+ docIdsToRemove.addAll(aggregateRoots.keySet());
+ docsToAdd.addAll(modified);
}
+ if (docIdsToRemove.isEmpty() && docsToAdd.isEmpty())
+ {
+ return null;
+ }
+ return new ChangesHolder(docIdsToRemove, docsToAdd);
}
/**
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SingletonTokenStream.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SingletonTokenStream.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SingletonTokenStream.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -16,64 +16,78 @@
*/
package org.exoplatform.services.jcr.impl.core.query.lucene;
-import java.io.IOException;
-
import org.apache.lucene.analysis.Token;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.Payload;
+import java.io.IOException;
+import java.io.Serializable;
+
/**
* <code>SingletonTokenStream</code> implements a token stream that wraps a
* single value with a given property type. The property type is stored as a
* payload on the single returned token.
*/
-public final class SingletonTokenStream extends TokenStream {
+public final class SingletonTokenStream extends TokenStream implements Serializable
+{
- /**
- * The string value of the token.
- */
- private String value;
+ /**
+ * The string value of the token.
+ */
+ private String value;
- /**
- * The payload of the token.
- */
- private final Payload payload;
+ /**
+ * The payload of the token.
+ */
+ private Payload payload;
- /**
- * Creates a new SingleTokenStream with the given value and a property
- * <code>type</code>.
- *
- * @param value the string value that will be returned with the token.
- * @param type the JCR property type.
- */
- public SingletonTokenStream(String value, int type) {
- this.value = value;
- this.payload = new Payload(new PropertyMetaData(type).toByteArray());
- }
+ /**
+ * for serialization
+ */
+ public SingletonTokenStream()
+ {
+ // TODO Auto-generated constructor stub
+ }
- /**
- * Creates a new SingleTokenStream with the given token.
- *
- * @param t the token.
- */
- public SingletonTokenStream(Token t) {
- this.value = t.term();
- this.payload = t.getPayload();
- }
+ /**
+ * Creates a new SingleTokenStream with the given value and a property
+ * <code>type</code>.
+ *
+ * @param value the string value that will be returned with the token.
+ * @param type the JCR property type.
+ */
+ public SingletonTokenStream(String value, int type)
+ {
+ this.value = value;
+ this.payload = new Payload(new PropertyMetaData(type).toByteArray());
+ }
- /**
- * {@inheritDoc}
- */
- public Token next(Token reusableToken) throws IOException {
- if (value == null) {
- return null;
- }
- reusableToken.clear();
- reusableToken.setTermBuffer(value);
- reusableToken.setPayload(payload);
- reusableToken.setStartOffset(0);
- reusableToken.setEndOffset(value.length());
- value = null;
- return reusableToken;
- }
+ /**
+ * Creates a new SingleTokenStream with the given token.
+ *
+ * @param t the token.
+ */
+ public SingletonTokenStream(Token t)
+ {
+ this.value = t.term();
+ this.payload = t.getPayload();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public Token next(Token reusableToken) throws IOException
+ {
+ if (value == null)
+ {
+ return null;
+ }
+ reusableToken.clear();
+ reusableToken.setTermBuffer(value);
+ reusableToken.setPayload(payload);
+ reusableToken.setStartOffset(0);
+ reusableToken.setEndOffset(value.length());
+ value = null;
+ return reusableToken;
+ }
}
Modified:
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-02-16
09:00:20 UTC (rev 3979)
+++
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -23,9 +23,11 @@
import org.exoplatform.services.jcr.impl.core.query.AbstractQueryHandler;
import org.exoplatform.services.jcr.impl.core.query.ExecutableQuery;
import org.exoplatform.services.jcr.impl.core.query.QueryHandlerContext;
+import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import org.exoplatform.services.jcr.impl.core.query.lucene.QueryHits;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Set;
import javax.jcr.RepositoryException;
@@ -92,4 +94,16 @@
// TODO Auto-generated method stub
return null;
}
+
+ public void apply(ChangesHolder changes) throws RepositoryException, IOException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public ChangesHolder getChanges(Iterator<String> remove,
Iterator<NodeData> add)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Added:
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/core/query/lucene/TestChangesHolder.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/core/query/lucene/TestChangesHolder.java
(rev 0)
+++
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/impl/core/query/lucene/TestChangesHolder.java 2011-02-16
10:02:16 UTC (rev 3980)
@@ -0,0 +1,241 @@
+/*
+ * Copyright (C) 2010 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.lucene;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.Field.TermVector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * @author <a href="mailto:nicolas.filotto@exoplatform.com">Nicolas
Filotto</a>
+ * @version $Id$
+ */
+public class TestChangesHolder extends TestCase
+{
+
+ public void testSerNDeserializeDocs() throws Exception
+ {
+ System.out.println("### testSerNDeserializeDocs ###");
+ Collection<Document> add = new ArrayList<Document>(3);
+ Document doc = new Document();
+ doc.setBoost(2.0f);
+ Field fieldFull = new Field("full", "full-value",
Store.COMPRESS, Index.ANALYZED_NO_NORMS, TermVector.WITH_POSITIONS_OFFSETS);
+ fieldFull.setBoost(2.0f);
+ fieldFull.setOmitTf(true);
+ doc.add(fieldFull);
+ Field fieldEmpty = new Field("empty", "empty-value", Store.NO,
Index.NOT_ANALYZED, TermVector.NO);
+ doc.add(fieldEmpty);
+ add.add(doc);
+ doc = new Document();
+ doc.add(fieldFull);
+ add.add(doc);
+ doc = new Document();
+ doc.add(fieldEmpty);
+ add.add(doc);
+
+ ByteArrayOutputStream baos = null;
+
+ int total = 100000;
+ long start;
+ Collection<String> remove = Collections.emptyList();
+ Collection<Document> addResult = null;
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(new ChangesHolder(remove, add));
+ oos.close();
+ }
+ System.out.println("Custom serialization: total time = " +
(System.currentTimeMillis() - start) + ", size = " + baos.size());
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(baos.toByteArray()));
+ addResult = ((ChangesHolder)ois.readObject()).getAdd();
+ ois.close();
+ }
+ System.out.println("Custom deserialization: total time = " +
(System.currentTimeMillis() - start));
+ checkDocs(addResult);
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(add);
+ oos.close();
+ }
+ System.out.println("Native serialization: total time = " +
(System.currentTimeMillis() - start) + ", size = " + baos.size());
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(baos.toByteArray()));
+ addResult = (Collection<Document>)ois.readObject();
+ ois.close();
+ }
+ System.out.println("Native deserialization: total time = " +
(System.currentTimeMillis() - start));
+ checkDocs(addResult);
+ }
+
+ private void checkDocs(Collection<Document> addResult)
+ {
+ assertNotNull(addResult);
+ assertEquals(3, addResult.size());
+ Iterator<Document> it = addResult.iterator();
+ Document doc = it.next();
+ assertEquals(2.0f, doc.getBoost());
+ List<Field> fields = doc.getFields();
+ assertNotNull(fields);
+ assertEquals(2, fields.size());
+ checkFieldFull(fields.get(0));
+ checkFieldEmpty(fields.get(1));
+ doc = it.next();
+ assertEquals(1.0f, doc.getBoost());
+ fields = doc.getFields();
+ assertNotNull(fields);
+ assertEquals(1, fields.size());
+ checkFieldFull(fields.get(0));
+ doc = it.next();
+ assertEquals(1.0f, doc.getBoost());
+ fields = doc.getFields();
+ assertNotNull(fields);
+ assertEquals(1, fields.size());
+ checkFieldEmpty(fields.get(0));
+ }
+
+ private void checkFieldFull(Field field)
+ {
+ assertEquals("full", field.name());
+ assertEquals("full-value", field.stringValue());
+ assertTrue(field.isStored());
+ assertTrue(field.isCompressed());
+ assertTrue(field.isIndexed());
+ assertTrue(field.isTokenized());
+ assertTrue(field.getOmitNorms());
+ assertTrue(field.isTermVectorStored());
+ assertTrue(field.isStoreOffsetWithTermVector());
+ assertTrue(field.isStorePositionWithTermVector());
+ assertTrue(field.getOmitTf());
+ assertFalse(field.isBinary());
+ assertFalse(field.isLazy());
+ assertEquals(2.0f, field.getBoost());
+ assertEquals(0, field.getBinaryLength());
+ assertEquals(0, field.getBinaryOffset());
+ }
+
+ private void checkFieldEmpty(Field field)
+ {
+ assertEquals("empty", field.name());
+ assertEquals("empty-value", field.stringValue());
+ assertFalse(field.isStored());
+ assertFalse(field.isCompressed());
+ assertTrue(field.isIndexed());
+ assertFalse(field.isTokenized());
+ assertFalse(field.getOmitNorms());
+ assertFalse(field.isTermVectorStored());
+ assertFalse(field.isStoreOffsetWithTermVector());
+ assertFalse(field.isStorePositionWithTermVector());
+ assertFalse(field.getOmitTf());
+ assertFalse(field.isBinary());
+ assertFalse(field.isLazy());
+ assertEquals(1.0f, field.getBoost());
+ assertEquals(0, field.getBinaryLength());
+ assertEquals(0, field.getBinaryOffset());
+ }
+
+ public void testSerNDeserializeIds() throws Exception
+ {
+ System.out.println("### testSerNDeserializeIds ###");
+ Collection<String> remove = new ArrayList<String>(3);
+ remove.add(UUID.randomUUID().toString());
+ remove.add(UUID.randomUUID().toString());
+ remove.add(UUID.randomUUID().toString());
+ ByteArrayOutputStream baos = null;
+
+ int total = 100000;
+ long start;
+ Collection<Document> add = Collections.emptyList();
+ Collection<String> addResult = null;
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(new ChangesHolder(remove, add));
+ oos.close();
+ }
+ System.out.println("Custom serialization: total time = " +
(System.currentTimeMillis() - start) + ", size = " + baos.size());
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(baos.toByteArray()));
+ addResult = ((ChangesHolder)ois.readObject()).getRemove();
+ ois.close();
+ }
+ System.out.println("Custom deserialization: total time = " +
(System.currentTimeMillis() - start));
+ checkIds(remove, addResult);
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(remove);
+ oos.close();
+ }
+ System.out.println("Native serialization: total time = " +
(System.currentTimeMillis() - start) + ", size = " + baos.size());
+ start = System.currentTimeMillis();
+ for (int i = 0; i < total; i++)
+ {
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(baos.toByteArray()));
+ addResult = (Collection<String>)ois.readObject();
+ ois.close();
+ }
+ System.out.println("Native deserialization: total time = " +
(System.currentTimeMillis() - start));
+ checkIds(remove, addResult);
+ }
+
+ private void checkIds(Collection<String> remove, Collection<String>
addResult)
+ {
+ assertNotNull(addResult);
+ assertEquals(remove.size(), addResult.size());
+ Iterator<String> it1 = remove.iterator();
+ Iterator<String> it2 = addResult.iterator();
+ while (it1.hasNext())
+ {
+ assertEquals(it1.next(), it2.next());
+ }
+ }
+}