[exo-jcr-commits] exo-jcr SVN: r3937 - in jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl: storage/jdbc/indexing and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 7 16:21:49 EST 2011
Author: tolusha
Date: 2011-02-07 16:21:49 -0500 (Mon, 07 Feb 2011)
New Revision: 3937
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java
Log:
EXOJCR-1104: RDBMS reindexing is multithreaded
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-07 18:54:11 UTC (rev 3936)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-02-07 21:21:49 UTC (rev 3937)
@@ -435,10 +435,10 @@
// NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
// check if we have deal with JDBC indexing mechanism
Indexable indexableComponent = (Indexable)handler.getContext().getContainer().getComponent(Indexable.class);
- long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
- // long count =
- // indexableComponent == null ? createIndex(indexingTree.getIndexingRoot(), stateMgr) : createIndex(
- // indexableComponent, indexingTree.getIndexingRoot());
+ long count =
+ indexableComponent == null ? createIndex(indexingTree.getIndexingRoot(), stateMgr) : createIndex(
+ indexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize()),
+ indexingTree.getIndexingRoot());
executeAndLog(new Commit(getTransactionId()));
log.info("Created initial index for {} nodes", new Long(count));
@@ -1533,60 +1533,118 @@
}
/**
- * Creates an index.
- * <code>node</code>.
+ * Create index.
*
- * @param indexableComponent
- * the component which responsible for quick indexing
+ * @param iterator
+ * the NodeDataIndexing iterator
* @param rootNode
- * the current NodeState.
- * @param path
- * the path of the current node.
+ * the root node of the index
+ * @return the total amount of indexed nodes
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws RepositoryException
+ * if any other error occurs
+ */
+ private long createIndex(NodeDataIndexingIterator iterator, NodeData rootNode) throws IOException,
+ RepositoryException
+ {
+ MultithreadedIndexing indexing = new MultithreadedIndexing(iterator, rootNode);
+ return indexing.launch(false);
+ }
+
+ /**
+ * Create index.
+ *
+ * @param iterator
+ * the NodeDataIndexing iterator
+ * @param rootNode
+ * the root node of the index
* @param count
* the number of nodes already indexed.
- * @return the number of nodes indexed so far.
* @throws IOException
* if an error occurs while writing to the index.
- * @throws ItemStateException
- * if an node state cannot be found.
* @throws RepositoryException
* if any other error occurs
+ * @throws InterruptedException
+ * if the task has been interrupted
*/
- private long createIndex(Indexable indexableComponent, NodeData rootNode, long count) throws IOException,
- RepositoryException
+ private void createIndex(final NodeDataIndexingIterator iterator, NodeData rootNode, final AtomicLong count)
+ throws RepositoryException, InterruptedException, IOException
{
- NodeDataIndexingIterator iterator =
- indexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize());
-
- while (iterator.hasNext())
+ for (NodeDataIndexing node : iterator.next())
{
- for (NodeDataIndexing node : iterator.next())
+ if (stopped)
{
- if (indexingTree.isExcluded(node))
- {
- continue;
- }
+ throw new InterruptedException();
+ }
- if (!node.getQPath().isDescendantOf(rootNode.getQPath()) && !node.getQPath().equals(rootNode.getQPath()))
- {
- continue;
- }
+ if (indexingTree.isExcluded(node))
+ {
+ continue;
+ }
- executeAndLog(new AddNode(getTransactionId(), node, true));
+ if (!node.getQPath().isDescendantOf(rootNode.getQPath()) && !node.getQPath().equals(rootNode.getQPath()))
+ {
+ continue;
+ }
- if (++count % 100 == 0)
+ executeAndLog(new AddNode(getTransactionId(), node, true));
+ if (count.incrementAndGet() % 1000 == 0)
+ {
+ log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
+ }
+
+ synchronized (this)
+ {
+ if (count.get() % 10 == 0)
{
- log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
- }
- if (count % 10 == 0)
- {
checkIndexingQueue(true);
}
checkVolatileCommit();
}
}
+ }
- return count;
+ /**
+ * Creates an index.
+ *
+ * @param tasks
+ * the queue of existing indexing tasks
+ * @param rootNode
+ * the root node of the index
+ * @param iterator
+ * the NodeDataIndexing iterator
+ * @param count
+ * the number of nodes already indexed.
+ * @throws IOException
+ * if an error occurs while writing to the index.
+ * @throws ItemStateException
+ * if an node state cannot be found.
+ * @throws RepositoryException
+ * if any other error occurs
+ * @throws InterruptedException
+ * if thread was interrupted
+ */
+ private void createIndex(final Queue<Callable<Void>> tasks, final NodeDataIndexingIterator iterator,
+ final NodeData rootNode, final AtomicLong count) throws IOException, RepositoryException, InterruptedException
+ {
+ while (iterator.hasNext())
+ {
+ Callable<Void> task = new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ createIndex(iterator, rootNode, count);
+ return null;
+ }
+ };
+
+ if (!tasks.offer(task))
+ {
+ // All threads have tasks to do so we do it ourself
+ createIndex(iterator, rootNode, count);
+ }
+ }
}
/**
@@ -3005,7 +3063,8 @@
};
/**
- * Default constructor
+ * MultithreadedIndexing constructor.
+ *
* @param node
* the current NodeState.
* @param stateMgr
@@ -3024,6 +3083,26 @@
}
/**
+ * MultithreadedIndexing constructor.
+ *
+ * @param node
+ * the current NodeState.
+ * @param iterator
+ * NodeDataIndexingIterator
+ */
+ public MultithreadedIndexing(final NodeDataIndexingIterator iterator, final NodeData rootNode)
+ {
+ tasks.offer(new Callable<Void>()
+ {
+ public Void call() throws Exception
+ {
+ createIndex(tasks, iterator, rootNode, count);
+ return null;
+ }
+ });
+ }
+
+ /**
* Launches the indexing
* @param asynchronous indicates whether or not the current thread needs to wait until the
* end of the indexing
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java 2011-02-07 18:54:11 UTC (rev 3936)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java 2011-02-07 21:21:49 UTC (rev 3937)
@@ -25,7 +25,10 @@
import org.exoplatform.services.log.ExoLogger;
import org.exoplatform.services.log.Log;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jcr.RepositoryException;
@@ -55,17 +58,17 @@
/**
* The current offset in database.
*/
- private int offset = 0;
+ private AtomicInteger offset = new AtomicInteger(0);
/**
- * Logger.
+ * Indicates if not all records have been read from database.
*/
- protected static final Log LOG = ExoLogger.getLogger("exo.jcr.component.core.JdbcIndexingDataIterator");
+ private AtomicBoolean hasNext = new AtomicBoolean(true);
/**
- * The list of nodes to return in next() method.
+ * Logger.
*/
- private List<NodeDataIndexing> current;
+ protected static final Log LOG = ExoLogger.getLogger("exo.jcr.component.core.JdbcIndexingDataIterator");
/**
* Constructor JdbcIndexingDataIterator.
@@ -75,41 +78,24 @@
{
this.connFactory = connFactory;
this.pageSize = pageSize;
- this.current = readNext();
}
/**
* {@inheritDoc}
*/
- public boolean hasNext()
- {
- return this.current.size() != 0;
- }
-
- /**
- * {@inheritDoc}
- */
public List<NodeDataIndexing> next() throws RepositoryException
{
- List<NodeDataIndexing> next = this.current;
- this.current = readNext();
+ if (!hasNext())
+ {
+ // avoid unnecessary request to database
+ return new ArrayList<NodeDataIndexing>();
+ }
- return next;
- }
-
- /**
- * Read nodes from database.
- *
- * @return List<NodeDataIndexing>
- * @throws RepositoryException
- */
- private List<NodeDataIndexing> readNext() throws RepositoryException
- {
JDBCStorageConnection conn = (JDBCStorageConnection)connFactory.openConnection();
try
{
- List<NodeDataIndexing> result = conn.getNodesAndProperties(offset, pageSize);
- offset += pageSize;
+ List<NodeDataIndexing> result = conn.getNodesAndProperties(offset.getAndAdd(pageSize), pageSize);
+ hasNext.compareAndSet(true, result.size() == pageSize);
return result;
}
@@ -118,5 +104,13 @@
conn.close();
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public boolean hasNext()
+ {
+ return hasNext.get();
+ }
}
More information about the exo-jcr-commits
mailing list