Author: tolusha
Date: 2011-02-07 12:56:22 -0500 (Mon, 07 Feb 2011)
New Revision: 3935
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
Log:
EXOJCR-1184: Improve the re-indexing mechanism to take advantage of multi-cores
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-07
16:24:25 UTC (rev 3934)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-07
17:56:22 UTC (rev 3935)
@@ -49,9 +49,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.ItemNotFoundException;
import javax.jcr.RepositoryException;
@@ -220,6 +227,11 @@
private boolean reindexing = false;
/**
+ * Flag indicating whether the index is stopped.
+ */
+ private volatile boolean stopped;
+
+ /**
* The index format version of this multi index.
*/
private final IndexFormatVersion version;
@@ -333,6 +345,15 @@
setReadWrite();
}
this.indexNames.setMultiIndex(this);
+ // Add a hook that will stop the threads if they are still running
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
+ @Override
+ public void run()
+ {
+ stopped = true;
+ }
+ });
}
/**
@@ -400,7 +421,6 @@
reindexing = true;
try
{
- long count = 0;
// traverse and index workspace
executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
@@ -408,14 +428,12 @@
// check if we have deal with JDBC indexing mechanism
Indexable indexableComponent =
(Indexable)handler.getContext().getContainer().getComponent(Indexable.class);
- if (indexableComponent == null)
- {
- count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
- }
- else
- {
- count = createIndex(indexableComponent, indexingTree.getIndexingRoot(),
count);
- }
+ // long count =
+ // indexableComponent == null ?
createIndex(indexingTree.getIndexingRoot(), stateMgr) : createIndex(
+ // indexableComponent, indexingTree.getIndexingRoot());
+
+ long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
+
executeAndLog(new Commit(getTransactionId()));
log.info("Created initial index for {} nodes", new Long(count));
releaseMultiReader();
@@ -1016,6 +1034,7 @@
}
}
}
+ this.stopped = true;
}
/**
@@ -1393,65 +1412,118 @@
}
}
+ private long createIndex(NodeData node, ItemDataConsumer stateMgr) throws IOException,
RepositoryException
+ {
+ MultithreadedIndexing indexing = new MultithreadedIndexing(node, stateMgr);
+ return indexing.launch(false);
+ }
+
/**
* Recursively creates an index starting with the NodeState
* <code>node</code>.
*
+ * @param tasks
+ * the queue of existing indexing tasks
* @param node
* the current NodeState.
- * @param path
- * the path of the current node.
* @param stateMgr
- * the shared item state manager.
+ * the shared item state manager.
* @param count
* the number of nodes already indexed.
- * @return the number of nodes indexed so far.
* @throws IOException
* if an error occurs while writing to the index.
* @throws ItemStateException
* if an node state cannot be found.
* @throws RepositoryException
* if any other error occurs
+ * @throws InterruptedException
+ * if the task has been interrupted
*/
- private long createIndex(NodeData node, ItemDataConsumer stateMgr, long count) throws
IOException,
- RepositoryException
+ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData
node, final ItemDataConsumer stateMgr,
+ final AtomicLong count) throws IOException, RepositoryException,
InterruptedException
{
- // NodeId id = node.getNodeId();
+ if (stopped)
+ {
+ throw new InterruptedException();
+ }
if (indexingTree.isExcluded(node))
{
- return count;
+ return;
}
- executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
- if (++count % 100 == 0)
+ executeAndLog(new AddNode(getTransactionId(), node.getIdentifier(), true));
+ if (count.incrementAndGet() % 1000 == 0)
{
+ log.info("indexing... {} ({})", node.getQPath().getAsString(), new
Long(count.get()));
+ }
- log.info("indexing... {} ({})", node.getQPath().getAsString(), new
Long(count));
- }
- if (count % 10 == 0)
+ synchronized (this)
{
- checkIndexingQueue(true);
+ if (count.get() % 10 == 0)
+ {
+ checkIndexingQueue(true);
+ }
+ checkVolatileCommit();
}
- checkVolatileCommit();
+
List<NodeData> children = stateMgr.getChildNodesData(node);
- for (NodeData nodeData : children)
+ for (final NodeData nodeData : children)
{
- NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
- if (childState == null)
+ Callable<Void> task = new Callable<Void>()
{
- handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
- new ItemNotFoundException("Child not found "), handler,
nodeData.getQPath(), node, nodeData);
- }
-
- if (nodeData != null)
+ public Void call() throws Exception
+ {
+ createIndex(tasks, node, stateMgr, count, nodeData);
+ return null;
+ }
+ };
+ if (!tasks.offer(task))
{
- count = createIndex(nodeData, stateMgr, count);
+ // All threads have tasks to do so we do it ourself
+ createIndex(tasks, node, stateMgr, count, nodeData);
}
}
+ }
- return count;
+ /**
+ * Recursively creates an index starting with the NodeState
+ * <code>node</code>.
+ *
+ * @param tasks
+ * the queue of existing indexing tasks
+ * @param node
+ * the current NodeState.
+ * @param stateMgr
+ * the shared item state manager.
+ * @param count
+ * the number of nodes already indexed.
+ * @param nodeData
+ * the node data to index.
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ * @throws InterruptedException
+ * if the task has been interrupted
+ */
+ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData
node, final ItemDataConsumer stateMgr,
+ final AtomicLong count, final NodeData nodeData) throws RepositoryException,
IOException, InterruptedException
+ {
+ NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
+ if (childState == null)
+ {
+ handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
+ new ItemNotFoundException("Child not found "), handler,
nodeData.getQPath(), node, nodeData);
+ }
+
+ if (nodeData != null)
+ {
+ createIndex(tasks, nodeData, stateMgr, count);
+ }
}
/**
@@ -1474,8 +1546,8 @@
* @throws RepositoryException
* if any other error occurs
*/
- private long createIndex(Indexable indexableComponent, NodeData rootNode, long count)
- throws IOException, RepositoryException
+ private long createIndex(Indexable indexableComponent, NodeData rootNode, long count)
throws IOException,
+ RepositoryException
{
NodeDataIndexingIterator iterator =
indexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize());
@@ -2017,8 +2089,8 @@
/**
* The maximum length of a AddNode String.
*/
- private static final int ENTRY_LENGTH =
- Long.toString(Long.MAX_VALUE).length() + Action.ADD_NODE.length() +
Constants.UUID_FORMATTED_LENGTH + 2;
+ private static final int ENTRY_LENGTH = Long.toString(Long.MAX_VALUE).length() +
Action.ADD_NODE.length()
+ + Constants.UUID_FORMATTED_LENGTH + 2;
/**
* The uuid of the node to add.
@@ -2031,6 +2103,8 @@
*/
private Document doc;
+ private boolean synch;
+
/**
* The node to add.
*/
@@ -2046,8 +2120,22 @@
*/
AddNode(long transactionId, String uuid)
{
+ this(transactionId, uuid, false);
+ }
+
+ /**
+ * Creates a new AddNode action.
+ *
+ * @param transactionId
+ * the id of the transaction that executes this action.
+ * @param uuid
+ * the uuid of the node to add.
+ */
+ AddNode(long transactionId, String uuid, boolean synch)
+ {
super(transactionId, Action.TYPE_ADD_NODE);
this.uuid = uuid;
+ this.synch = synch;
}
/**
@@ -2126,9 +2214,20 @@
log.debug(e.getMessage());
}
}
+
if (doc != null)
{
- index.volatileIndex.addDocuments(new Document[]{doc});
+ if (synch)
+ {
+ synchronized (index)
+ {
+ index.volatileIndex.addDocuments(new Document[]{doc});
+ }
+ }
+ else
+ {
+ index.volatileIndex.addDocuments(new Document[]{doc});
+ }
}
}
@@ -2385,8 +2484,8 @@
/**
* The maximum length of a DeleteNode String.
*/
- private static final int ENTRY_LENGTH =
- Long.toString(Long.MAX_VALUE).length() + Action.DELETE_NODE.length() +
Constants.UUID_FORMATTED_LENGTH + 2;
+ private static final int ENTRY_LENGTH = Long.toString(Long.MAX_VALUE).length() +
Action.DELETE_NODE.length()
+ + Constants.UUID_FORMATTED_LENGTH + 2;
/**
* The uuid of the node to remove.
@@ -2631,7 +2730,7 @@
{
// try to stop merger in safe way
merger.dispose();
-
+
flushTask.cancel();
FLUSH_TIMER.purge();
this.redoLog = null;
@@ -2769,4 +2868,203 @@
}
}
}
+
+ /**
+ * This class is used to index a node and its descendants nodes with several threads
+ */
+ private class MultithreadedIndexing
+ {
+ /**
+ * This instance of {@link AtomicReference} will contain the exception meet if any
exception has occurred
+ */
+ private final AtomicReference<Exception> exception = new
AtomicReference<Exception>();
+
+ /**
+ * The total amount of threads used for the indexing
+ */
+ private final int nThreads = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * The {@link CountDownLatch} used to notify that the indexing is over
+ */
+ private final CountDownLatch endSignal = new CountDownLatch(nThreads);
+
+ /**
+ * The total amount of threads currently working
+ */
+ private final AtomicInteger runningThreads = new AtomicInteger();
+
+ /**
+ * The total amount of nodes already indexed
+ */
+ private final AtomicLong count = new AtomicLong();
+
+ /**
+ * The list of indexing tasks left to do
+ */
+ private final Queue<Callable<Void>> tasks = new
LinkedBlockingQueue<Callable<Void>>(nThreads)
+ {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Callable<Void> poll()
+ {
+ Callable<Void> task;
+ synchronized (runningThreads)
+ {
+ if ((task = super.poll()) != null)
+ {
+ runningThreads.incrementAndGet();
+ }
+ }
+ return task;
+ }
+
+ @Override
+ public boolean offer(Callable<Void> o)
+ {
+ if (super.offer(o))
+ {
+ synchronized (runningThreads)
+ {
+ runningThreads.notifyAll();
+ }
+ return true;
+ }
+ return false;
+ }
+ };
+
+ /**
+ * The task that all the indexing threads have to execute
+ */
+ private final Runnable indexingTask = new Runnable()
+ {
+ public void run()
+ {
+ while (exception.get() == null)
+ {
+ Callable<Void> task;
+ while (exception.get() == null && (task = tasks.poll()) != null)
+ {
+ try
+ {
+ task.call();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ finally
+ {
+ synchronized (runningThreads)
+ {
+ runningThreads.decrementAndGet();
+ runningThreads.notifyAll();
+ }
+ }
+ }
+ synchronized (runningThreads)
+ {
+ if (exception.get() == null && (runningThreads.get() > 0))
+ {
+ try
+ {
+ runningThreads.wait();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ endSignal.countDown();
+ }
+ };
+
+ /**
+ * Default constructor
+ * @param node
+ * the current NodeState.
+ * @param stateMgr
+ * the shared item state manager.
+ */
+ public MultithreadedIndexing(final NodeData node, final ItemDataConsumer stateMgr)
+ {
+ tasks.offer(new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ createIndex(tasks, node, stateMgr, count);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Launches the indexing
+ * @param asynchronous indicates whether or not the current thread needs to wait
until the
+ * end of the indexing
+ * @return the total amount of nodes that have been indexed.
<code>-1</code> in case of an
+ * asynchronous indexing
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ */
+ public long launch(boolean asynchronous) throws IOException, RepositoryException
+ {
+ startThreads();
+ if (!asynchronous)
+ {
+ try
+ {
+ endSignal.await();
+ if (exception.get() != null)
+ {
+ if (exception.get() instanceof IOException)
+ {
+ throw (IOException)exception.get();
+ }
+ else if (exception.get() instanceof RepositoryException)
+ {
+ throw (RepositoryException)exception.get();
+ }
+ else
+ {
+ throw new RuntimeException("Error while indexing",
exception.get());
+ }
+ }
+ return count.get();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return -1L;
+ }
+
+ /**
+ * Starts all the indexing threads
+ */
+ private void startThreads()
+ {
+ for (int i = 0; i < nThreads; i++)
+ {
+ (new Thread(indexingTask, "Indexing Thread #" + (i + 1))).start();
+ }
+ }
+ }
}