[exo-jcr-commits] exo-jcr SVN: r4424 - in jcr/branches/1.12.x: patch/1.12.8-GA and 2 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue May 24 07:00:53 EDT 2011
Author: paristote
Date: 2011-05-24 07:00:53 -0400 (Tue, 24 May 2011)
New Revision: 4424
Added:
jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/
jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt
Removed:
jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/
Modified:
jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
Log:
JCR-1572
What is the problem to fix?
* Tested EPP 5.0.1 with 19 000 users created thanks to crash and its advanced command addusers that you can found here https://github.com/exoplatform/gatein-stuff. The table JCR_SITEM contains 4,3 Millions of rows and JCR_SVALUE contains 3,2 Millions of rows, in this case if I try to re-index my content that contains 1 216 000 nodes, the re-indexing takes 3 h 40 minutes on my laptop which is a dual core. The idea of my patch proposal is to propose a multi-threaded re-indexing mechanism in order to take advantage of multi-cores which is commonly used now, thanks to this patch the re-indexing takes 2 h 20 minutes on my laptop knowing that the main bottelneck is the db.
Please check the patch and if it is ok ask the QA to load a big table and re-index it with and without the patch to see the gain in their environment.
How is the problem fixed?
* involve several threads in jcr indexing operation
Modified: jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-05-24 10:44:25 UTC (rev 4423)
+++ jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-05-24 11:00:53 UTC (rev 4424)
@@ -43,9 +43,16 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jcr.ItemNotFoundException;
import javax.jcr.RepositoryException;
@@ -214,6 +221,11 @@
private boolean reindexing = false;
/**
+ * Flag indicating whether the index is stopped.
+ */
+ private volatile boolean stopped;
+
+ /**
* The index format version of this multi index.
*/
private final IndexFormatVersion version;
@@ -314,6 +326,14 @@
setReadWrite();
}
this.indexNames.setMultiIndex(this);
+ // Add a hook that will stop the threads if they are still running
+ Runtime.getRuntime().addShutdownHook(new Thread()
+ {
+ public void run()
+ {
+ stopped = true;
+ }
+ });
}
/**
@@ -374,11 +394,10 @@
reindexing = true;
try
{
- long count = 0;
// traverse and index workspace
executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
// NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
- count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
+ long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
executeAndLog(new Commit(getTransactionId()));
log.info("Created initial index for {} nodes", new Long(count));
releaseMultiReader();
@@ -958,6 +977,7 @@
}
}
}
+ this.stopped = true;
}
/**
@@ -1304,44 +1324,118 @@
* @throws RepositoryException
* if any other error occurs
*/
- private long createIndex(NodeData node, ItemDataConsumer stateMgr, long count) throws IOException,
+ private long createIndex(NodeData node, ItemDataConsumer stateMgr) throws IOException,
RepositoryException
{
+ MultithreadedIndexing indexing = new MultithreadedIndexing(node, stateMgr);
+ return indexing.launch(false);
+ }
+
+ /**
+ * Recursively creates an index starting with the NodeState
+ * <code>node</code>.
+ *
+ * @pamam tasks
+ * the queue of existing indexing tasks
+ * @param node
+ * the current NodeState.
+ * @param stateMgr
+ * the shared item state manager.
+ * @param count
+ * the number of nodes already indexed.
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ * @throws InterruptedException
+ * if the task has been interrupted
+ */
+ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node, final ItemDataConsumer stateMgr, final AtomicLong count) throws IOException,
+ RepositoryException, InterruptedException
+ {
+ if (stopped)
+ {
+ throw new InterruptedException();
+ }
// NodeId id = node.getNodeId();
if (indexingTree.isExcluded(node))
{
- return count;
+ return;
}
- executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
- if (++count % 100 == 0)
+ executeAndLog(new AddNode(getTransactionId(), node.getIdentifier(), true));
+ if (count.incrementAndGet() % 1000 == 0)
{
-
- log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
+ log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
}
- if (count % 10 == 0)
+ synchronized (this)
{
- checkIndexingQueue(true);
+ if (count.get() % 10 == 0)
+ {
+ checkIndexingQueue(true);
+ }
+ checkVolatileCommit();
}
- checkVolatileCommit();
List<NodeData> children = stateMgr.getChildNodesData(node);
- for (NodeData nodeData : children)
+ for (final NodeData nodeData : children)
{
-
- NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
- if (childState == null)
+ Callable<Void> task = new Callable<Void>()
{
- handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
- new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
- }
+ public Void call() throws Exception
+ {
+ createIndex(tasks, node, stateMgr, count, nodeData);
+ return null;
+ }
- if (nodeData != null)
+ };
+ if (!tasks.offer(task))
{
- count = createIndex(nodeData, stateMgr, count);
+ // All threads have tasks to do so we do it ourself
+ createIndex(tasks, node, stateMgr, count, nodeData);
}
}
+ }
+
+ /**
+ * Recursively creates an index starting with the NodeState
+ * <code>node</code>.
+ *
+ * @pamam tasks
+ * the queue of existing indexing tasks
+ * @param node
+ * the current NodeState.
+ * @param stateMgr
+ * the shared item state manager.
+ * @param count
+ * the number of nodes already indexed.
+ * @param nodeData
+ * the node data to index.
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ * @throws InterruptedException
+ * if the task has been interrupted
+ */
+ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node,
+ final ItemDataConsumer stateMgr, final AtomicLong count, final NodeData nodeData)
+ throws RepositoryException, IOException, InterruptedException
+ {
+ NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
+ if (childState == null)
+ {
+ handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
+ new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
+ }
- return count;
+ if (nodeData != null)
+ {
+ createIndex(tasks, nodeData, stateMgr, count);
+ }
}
/**
@@ -1857,6 +1951,8 @@
*/
private Document doc;
+ private boolean synch;
+
/**
* Creates a new AddNode action.
*
@@ -1867,8 +1963,22 @@
*/
AddNode(long transactionId, String uuid)
{
+ this(transactionId, uuid, false);
+ }
+
+ /**
+ * Creates a new AddNode action.
+ *
+ * @param transactionId
+ * the id of the transaction that executes this action.
+ * @param uuid
+ * the uuid of the node to add.
+ */
+ AddNode(long transactionId, String uuid, boolean synch)
+ {
super(transactionId, Action.TYPE_ADD_NODE);
this.uuid = uuid;
+ this.synch = synch;
}
/**
@@ -1928,7 +2038,17 @@
}
if (doc != null)
{
- index.volatileIndex.addDocuments(new Document[]{doc});
+ if (synch)
+ {
+ synchronized (index)
+ {
+ index.volatileIndex.addDocuments(new Document[]{doc});
+ }
+ }
+ else
+ {
+ index.volatileIndex.addDocuments(new Document[]{doc});
+ }
}
}
@@ -2540,7 +2660,206 @@
}
}
}
+
+ /**
+ * This class is used to index a node and its descendants nodes with several threads
+ */
+ private class MultithreadedIndexing
+ {
+ /**
+ * This instance of {@link AtomicReference} will contain the exception meet if any exception has occurred
+ */
+ private final AtomicReference<Exception> exception = new AtomicReference<Exception>();
+
+ /**
+ * The total amount of threads used for the indexing
+ */
+ private final int nThreads = Runtime.getRuntime().availableProcessors();
+
+ /**
+ * The {@link CountDownLatch} used to notify that the indexing is over
+ */
+ private final CountDownLatch endSignal = new CountDownLatch(nThreads);
+
+ /**
+ * The total amount of threads currently working
+ */
+ private final AtomicInteger runningThreads = new AtomicInteger();
+
+ /**
+ * The total amount of nodes already indexed
+ */
+ private final AtomicLong count = new AtomicLong();
+
+ /**
+ * The list of indexing tasks left to do
+ */
+ private final Queue<Callable<Void>> tasks = new LinkedBlockingQueue<Callable<Void>>(nThreads)
+ {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Callable<Void> poll()
+ {
+ Callable<Void> task;
+ synchronized (runningThreads)
+ {
+ if ((task = super.poll()) != null)
+ {
+ runningThreads.incrementAndGet();
+ }
+ }
+ return task;
+ }
+
+ @Override
+ public boolean offer(Callable<Void> o)
+ {
+ if (super.offer(o))
+ {
+ synchronized (runningThreads)
+ {
+ runningThreads.notifyAll();
+ }
+ return true;
+ }
+ return false;
+ }
+ };
+
+ /**
+ * The task that all the indexing threads have to execute
+ */
+ private final Runnable indexingTask = new Runnable()
+ {
+ public void run()
+ {
+ while (exception.get() == null)
+ {
+ Callable<Void> task;
+ while (exception.get() == null && (task = tasks.poll()) != null)
+ {
+ try
+ {
+ task.call();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ catch (Exception e)
+ {
+ exception.set(e);
+ }
+ finally
+ {
+ synchronized (runningThreads)
+ {
+ runningThreads.decrementAndGet();
+ runningThreads.notifyAll();
+ }
+ }
+ }
+ synchronized (runningThreads)
+ {
+ if (exception.get() == null && (runningThreads.get() > 0))
+ {
+ try
+ {
+ runningThreads.wait();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ endSignal.countDown();
+ }
+ };
+
+ /**
+ * Default constructor
+ * @param node
+ * the current NodeState.
+ * @param stateMgr
+ * the shared item state manager.
+ */
+ public MultithreadedIndexing(final NodeData node, final ItemDataConsumer stateMgr)
+ {
+ tasks.offer(new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ createIndex(tasks, node, stateMgr, count);
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Launches the indexing
+ * @param asynchronous indicates whether or not the current thread needs to wait until the
+ * end of the indexing
+ * @return the total amount of nodes that have been indexed. <code>-1</code> in case of an
+ * asynchronous indexing
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ */
+ public long launch(boolean asynchronous) throws IOException, RepositoryException
+ {
+ startThreads();
+ if (!asynchronous)
+ {
+ try
+ {
+ endSignal.await();
+ if (exception.get() != null)
+ {
+ if (exception.get() instanceof IOException)
+ {
+ throw (IOException)exception.get();
+ }
+ else if (exception.get() instanceof RepositoryException)
+ {
+ throw (RepositoryException)exception.get();
+ }
+ else
+ {
+ throw new RuntimeException("Error while indexing", exception.get());
+ }
+ }
+ return count.get();
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return -1L;
+ }
+
+ /**
+ * Starts all the indexing threads
+ */
+ private void startThreads()
+ {
+ for (int i=0; i < nThreads; i++)
+ {
+ (new Thread(indexingTask, "Indexing Thread #" + (i + 1))).start();
+ }
+ }
+ }
+
/**
* @see org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitorListener#onUpdateInProgressChange(boolean)
*/
Added: jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt
===================================================================
--- jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt (rev 0)
+++ jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt 2011-05-24 11:00:53 UTC (rev 4424)
@@ -0,0 +1,65 @@
+Summary
+
+ Status: Improve the re-indexing mechanism to take advantage of multi-cores
+ CCP Issue: N/A, Product Jira Issue: JCR-1572.
+ Complexity: Medium
+
+The Proposal
+Problem description
+
+What is the problem to fix?
+Tested EPP 5.0.1 with 19 000 users created thanks to crash and its advanced command addusers that you can found here https://github.com/exoplatform/gatein-stuff. The table JCR_SITEM contains 4,3 Millions of rows and JCR_SVALUE contains 3,2 Millions of rows, in this case if I try to re-index my content that contains 1 216 000 nodes, the re-indexing takes 3 h 40 minutes on my laptop which is a dual core. The idea of my patch proposal is to propose a multi-threaded re-indexing mechanism in order to take advantage of multi-cores which is commonly used now, thanks to this patch the re-indexing takes 2 h 20 minutes on my laptop knowing that the main bottelneck is the db.
+
+Please check the patch and if it is ok ask the QA to load a big table and re-index it with and without the patch to see the gain in their environment.
+Fix description
+
+How is the problem fixed?
+
+* involve several threads in jcr indexing operation* *
+
+Patch information:
+Patch files: JCR-1572.patch
+
+Tests to perform
+
+Reproduction test
+* No
+
+Tests performed at DevLevel
+* functional testing in JCR project
+
+Tests performed at QA/Support Level
+*
+
+Documentation changes
+
+Documentation changes:
+* No
+
+Configuration changes
+
+Configuration changes:
+* No
+
+Will previous configuration continue to work?
+* Yes
+
+Risks and impacts
+
+Can this bug fix have any side effects on current client projects?
+ No
+
+Is there a performance risk/cost?
+* No
+
+Validation (PM/Support/QA)
+
+PM Comment
+* PL review: patch validated
+
+Support Comment
+* Support review: patch validated
+
+QA Feedbacks
+*
+
More information about the exo-jcr-commits
mailing list