Author: nzamosenchuk
Date: 2011-01-14 03:39:54 -0500 (Fri, 14 Jan 2011)
New Revision: 3809
Modified:
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
Log:
EXOJCR-1144 : first, partially hardcoded implementation of prototype. Docs are indexed
into the each volatile. Volatile reseted by one of the following conditions:
1. Volatile index goes out of configured memory. To avoid OoM exception. Required for huge
updates.
2. When coordinator notifies that persisted update finished. It means, that volatile flush
was performed on server side.
Modified:
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java
===================================================================
---
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java 2011-01-13
15:27:10 UTC (rev 3808)
+++
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/IndexerSingletonStoreCacheLoader.java 2011-01-14
08:39:54 UTC (rev 3809)
@@ -62,7 +62,9 @@
final boolean debugEnabled = log.isDebugEnabled();
if (debugEnabled)
+ {
log.debug("start pushing in-memory state to cache cacheLoader
collection");
+ }
// merging all lists stored in memory
Collection<NodeSPI> children = cache.getRoot().getChildren();
@@ -87,13 +89,13 @@
{
// get search manager lists
addedNodes.addAll(listsWrapper.getChanges().getAddIds());
- removedNodes.addAll(listsWrapper.getChanges().getRemove());
+ removedNodes.addAll(listsWrapper.getChanges().getRemove());
}
if (listsWrapper.getParentChanges() != null)
{
// parent search manager lists
parentAddedNodes.addAll(listsWrapper.getParentChanges().getAddIds());
-
parentRemovedNodes.addAll(listsWrapper.getParentChanges().getRemove());
+
parentRemovedNodes.addAll(listsWrapper.getParentChanges().getRemove());
}
}
else
@@ -103,14 +105,14 @@
removedNodes.addAll(listsWrapper.getRemovedNodes());
// parent search manager lists
parentAddedNodes.addAll(listsWrapper.getParentAddedNodes());
- parentRemovedNodes.addAll(listsWrapper.getParentAddedNodes());
+ parentRemovedNodes.addAll(listsWrapper.getParentAddedNodes());
}
}
}
//TODO: recover logic is here, lists are: removedNodes and addedNodes
String id = IdGenerator.generate();
String id = IdGenerator.generate();
- cache.put(Fqn.fromRelativeElements(wsChildren.getFqn(), id),
JBossCacheIndexChangesFilter.LISTWRAPPER, new ChangesFilterListsWrapper(addedNodes,
- removedNodes, parentAddedNodes, parentRemovedNodes));
+ cache.put(Fqn.fromRelativeElements(wsChildren.getFqn(), id),
JBossCacheIndexChangesFilter.LISTWRAPPER,
+ new ChangesFilterListsWrapper(addedNodes, removedNodes,
parentAddedNodes, parentRemovedNodes));
// Once we put the merged changes into the cache we can remove other
changes from the cache
for (NodeSPI aChildren : changes)
{
@@ -120,12 +122,22 @@
}
}
if (debugEnabled)
+ {
log.debug("in-memory state passed to cache cacheLoader
successfully");
+ }
return null;
}
};
}
+ @Override
+ public Object put(Fqn name, Object key, Object value) throws Exception
+ {
+ // delegating call to underlying cache loader, skipping SingletonStore cache
loader.
+ // Later SingletonStore should completely be removed
+ return getCacheLoader().put(name, key, value);
+ }
+
/**
* Sets/changes indexer mode
*
Modified:
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
---
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-01-13
15:27:10 UTC (rev 3808)
+++
jcr/branches/1.14-IMPR/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-01-14
08:39:54 UTC (rev 3809)
@@ -49,6 +49,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.ItemNotFoundException;
import javax.jcr.RepositoryException;
@@ -440,6 +441,25 @@
*/
synchronized void update(final Collection remove, final Collection add) throws
IOException
{
+ if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+ {
+ doUpdateRW(remove, add);
+ }
+ else
+ {
+ doUpdateRO(remove, add);
+ }
+ }
+
+ /**
+ * For investigation purposes only
+ *
+ * @param remove
+ * @param add
+ * @throws IOException
+ */
+ private void doUpdateRO(final Collection remove, final Collection add) throws
IOException
+ {
SecurityHelper.doPriviledgedIOExceptionAction(new
PrivilegedExceptionAction<Object>()
{
public Object run() throws Exception
@@ -458,6 +478,70 @@
}
}
+ try
+ {
+ long transactionId = nextTransactionId++;
+ new Start(transactionId).execute(MultiIndex.this);
+
+ for (Iterator it = remove.iterator(); it.hasNext();)
+ {
+ new DeleteNode(transactionId,
(String)it.next()).execute(MultiIndex.this);
+ }
+ for (Iterator it = add.iterator(); it.hasNext();)
+ {
+ Document doc = (Document)it.next();
+ if (doc != null)
+ {
+ new AddNode(transactionId, doc).execute(MultiIndex.this);
+ // reset volatile index if needed
+ if (volatileIndex.getRamSizeInBytes() >=
handler.getMaxVolatileIndexSize())
+ {
+ // not to get out of memory
+ resetVolatileIndex();
+ }
+ }
+ }
+ new Commit(transactionId).execute(MultiIndex.this);
+ }
+ finally
+ {
+ synchronized (updateMonitor)
+ {
+ releaseMultiReader();
+ }
+ }
+ return null;
+ }
+ });
+ }
+
+ /**
+ * For investigation purposes only
+ *
+ * @param remove
+ * @param add
+ * @throws IOException
+ */
+ private void doUpdateRW(final Collection remove, final Collection add) throws
IOException
+ {
+ SecurityHelper.doPriviledgedIOExceptionAction(new
PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
+ {
+ // make sure a reader is available during long updates
+ if (add.size() > handler.getBufferSize())
+ {
+ try
+ {
+ getIndexReader().release();
+ }
+ catch (IOException e)
+ {
+ // do not fail if an exception is thrown here
+ log.warn("unable to prepare index reader " + "for
queries during update", e);
+ }
+ }
+
synchronized (updateMonitor)
{
//updateInProgress = true;
@@ -2318,7 +2402,8 @@
// if the document cannot be deleted from the volatile index
// delete it from one of the persistent indexes.
int num = index.volatileIndex.removeDocument(idTerm);
- if (num == 0)
+ // Skip modifying persisted indexes in case of RO.
+ if (num == 0 && index.modeHandler.getMode() ==
IndexerIoMode.READ_WRITE)
{
for (int i = index.indexes.size() - 1; i >= 0; i--)
{
@@ -2625,7 +2710,12 @@
{
synchronized (updateMonitor)
{
+ resetVolatileIndex();
updateMonitor.notifyAll();
+ // Coordinator set cluster-wide updateInProgress only in case of
persistent flush, which
+ // invokes volatile reset. So if RO cluster node received this
notification, it means that
+ // coordinator flushed volatile.
+
releaseMultiReader();
}
}