[exo-jcr-commits] exo-jcr SVN: r3939 - in jcr/branches/1.12.x/patch/1.12.8-GA: JCR-1572 and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Feb 8 03:30:07 EST 2011
Author: tolusha
Date: 2011-02-08 03:30:07 -0500 (Tue, 08 Feb 2011)
New Revision: 3939
Added:
jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/
jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch
Log:
JCR-1572: patch proposed
Added: jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch
===================================================================
--- jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch (rev 0)
+++ jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch 2011-02-08 08:30:07 UTC (rev 3939)
@@ -0,0 +1,462 @@
+Index: exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
+===================================================================
+--- exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java (revision 3789)
++++ exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java (working copy)
+@@ -43,9 +43,16 @@
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Queue;
+ import java.util.Set;
+ import java.util.Timer;
+ import java.util.TimerTask;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicLong;
++import java.util.concurrent.atomic.AtomicReference;
+
+ import javax.jcr.ItemNotFoundException;
+ import javax.jcr.RepositoryException;
+@@ -214,6 +221,11 @@
+ private boolean reindexing = false;
+
+ /**
++ * Flag indicating whether the index is stopped.
++ */
++ private volatile boolean stopped;
++
++ /**
+ * The index format version of this multi index.
+ */
+ private final IndexFormatVersion version;
+@@ -314,6 +326,14 @@
+ setReadWrite();
+ }
+ this.indexNames.setMultiIndex(this);
++ // Add a hook that will stop the threads if they are still running
++ Runtime.getRuntime().addShutdownHook(new Thread()
++ {
++ public void run()
++ {
++ stopped = true;
++ }
++ });
+ }
+
+ /**
+@@ -374,11 +394,10 @@
+ reindexing = true;
+ try
+ {
+- long count = 0;
+ // traverse and index workspace
+ executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+ // NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
+- count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
++ long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
+ executeAndLog(new Commit(getTransactionId()));
+ log.info("Created initial index for {} nodes", new Long(count));
+ releaseMultiReader();
+@@ -951,6 +970,7 @@
+ }
+ }
+ }
++ this.stopped = true;
+ }
+
+ /**
+@@ -1297,44 +1317,118 @@
+ * @throws RepositoryException
+ * if any other error occurs
+ */
+- private long createIndex(NodeData node, ItemDataConsumer stateMgr, long count) throws IOException,
++ private long createIndex(NodeData node, ItemDataConsumer stateMgr) throws IOException,
+ RepositoryException
+ {
++ MultithreadedIndexing indexing = new MultithreadedIndexing(node, stateMgr);
++ return indexing.launch(false);
++ }
++
++ /**
++ * Recursively creates an index starting with the NodeState
++ * <code>node</code>.
++ *
++ * @pamam tasks
++ * the queue of existing indexing tasks
++ * @param node
++ * the current NodeState.
++ * @param stateMgr
++ * the shared item state manager.
++ * @param count
++ * the number of nodes already indexed.
++ * @throws IOException
++ * if an error occurs while writing to the index.
++ * @throws ItemStateException
++ * if an node state cannot be found.
++ * @throws RepositoryException
++ * if any other error occurs
++ * @throws InterruptedException
++ * if the task has been interrupted
++ */
++ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node, final ItemDataConsumer stateMgr, final AtomicLong count) throws IOException,
++ RepositoryException, InterruptedException
++ {
++ if (stopped)
++ {
++ throw new InterruptedException();
++ }
+ // NodeId id = node.getNodeId();
+
+ if (indexingTree.isExcluded(node))
+ {
+- return count;
++ return;
+ }
+- executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+- if (++count % 100 == 0)
++ executeAndLog(new AddNode(getTransactionId(), node.getIdentifier(), true));
++ if (count.incrementAndGet() % 1000 == 0)
+ {
+-
+- log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
++ log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
+ }
+- if (count % 10 == 0)
++ synchronized (this)
+ {
+- checkIndexingQueue(true);
++ if (count.get() % 10 == 0)
++ {
++ checkIndexingQueue(true);
++ }
++ checkVolatileCommit();
+ }
+- checkVolatileCommit();
+ List<NodeData> children = stateMgr.getChildNodesData(node);
+- for (NodeData nodeData : children)
++ for (final NodeData nodeData : children)
+ {
+-
+- NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
+- if (childState == null)
++ Callable<Void> task = new Callable<Void>()
+ {
+- handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
+- new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
+- }
++ public Void call() throws Exception
++ {
++ createIndex(tasks, node, stateMgr, count, nodeData);
++ return null;
++ }
+
+- if (nodeData != null)
++ };
++ if (!tasks.offer(task))
+ {
+- count = createIndex(nodeData, stateMgr, count);
++ // All threads have tasks to do so we do it ourself
++ createIndex(tasks, node, stateMgr, count, nodeData);
+ }
+ }
++ }
++
++ /**
++ * Recursively creates an index starting with the NodeState
++ * <code>node</code>.
++ *
++ * @pamam tasks
++ * the queue of existing indexing tasks
++ * @param node
++ * the current NodeState.
++ * @param stateMgr
++ * the shared item state manager.
++ * @param count
++ * the number of nodes already indexed.
++ * @param nodeData
++ * the node data to index.
++ * @throws IOException
++ * if an error occurs while writing to the index.
++ * @throws ItemStateException
++ * if an node state cannot be found.
++ * @throws RepositoryException
++ * if any other error occurs
++ * @throws InterruptedException
++ * if the task has been interrupted
++ */
++ private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node,
++ final ItemDataConsumer stateMgr, final AtomicLong count, final NodeData nodeData)
++ throws RepositoryException, IOException, InterruptedException
++ {
++ NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
++ if (childState == null)
++ {
++ handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
++ new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
++ }
+
+- return count;
++ if (nodeData != null)
++ {
++ createIndex(tasks, nodeData, stateMgr, count);
++ }
+ }
+
+ /**
+@@ -1850,6 +1944,8 @@
+ */
+ private Document doc;
+
++ private boolean synch;
++
+ /**
+ * Creates a new AddNode action.
+ *
+@@ -1860,8 +1956,22 @@
+ */
+ AddNode(long transactionId, String uuid)
+ {
++ this(transactionId, uuid, false);
++ }
++
++ /**
++ * Creates a new AddNode action.
++ *
++ * @param transactionId
++ * the id of the transaction that executes this action.
++ * @param uuid
++ * the uuid of the node to add.
++ */
++ AddNode(long transactionId, String uuid, boolean synch)
++ {
+ super(transactionId, Action.TYPE_ADD_NODE);
+ this.uuid = uuid;
++ this.synch = synch;
+ }
+
+ /**
+@@ -1921,7 +2031,17 @@
+ }
+ if (doc != null)
+ {
+- index.volatileIndex.addDocuments(new Document[]{doc});
++ if (synch)
++ {
++ synchronized (index)
++ {
++ index.volatileIndex.addDocuments(new Document[]{doc});
++ }
++ }
++ else
++ {
++ index.volatileIndex.addDocuments(new Document[]{doc});
++ }
+ }
+ }
+
+@@ -2559,4 +2679,203 @@
+ }
+ }
+ }
++
++ /**
++ * This class is used to index a node and its descendants nodes with several threads
++ */
++ private class MultithreadedIndexing
++ {
++ /**
++ * This instance of {@link AtomicReference} will contain the exception meet if any exception has occurred
++ */
++ private final AtomicReference<Exception> exception = new AtomicReference<Exception>();
++
++ /**
++ * The total amount of threads used for the indexing
++ */
++ private final int nThreads = Runtime.getRuntime().availableProcessors();
++
++ /**
++ * The {@link CountDownLatch} used to notify that the indexing is over
++ */
++ private final CountDownLatch endSignal = new CountDownLatch(nThreads);
++
++ /**
++ * The total amount of threads currently working
++ */
++ private final AtomicInteger runningThreads = new AtomicInteger();
++
++ /**
++ * The total amount of nodes already indexed
++ */
++ private final AtomicLong count = new AtomicLong();
++
++ /**
++ * The list of indexing tasks left to do
++ */
++ private final Queue<Callable<Void>> tasks = new LinkedBlockingQueue<Callable<Void>>(nThreads)
++ {
++ private static final long serialVersionUID = 1L;
++
++ @Override
++ public Callable<Void> poll()
++ {
++ Callable<Void> task;
++ synchronized (runningThreads)
++ {
++ if ((task = super.poll()) != null)
++ {
++ runningThreads.incrementAndGet();
++ }
++ }
++ return task;
++ }
++
++ @Override
++ public boolean offer(Callable<Void> o)
++ {
++ if (super.offer(o))
++ {
++ synchronized (runningThreads)
++ {
++ runningThreads.notifyAll();
++ }
++ return true;
++ }
++ return false;
++ }
++ };
++
++ /**
++ * The task that all the indexing threads have to execute
++ */
++ private final Runnable indexingTask = new Runnable()
++ {
++ public void run()
++ {
++ while (exception.get() == null)
++ {
++ Callable<Void> task;
++ while (exception.get() == null && (task = tasks.poll()) != null)
++ {
++ try
++ {
++ task.call();
++ }
++ catch (InterruptedException e)
++ {
++ Thread.currentThread().interrupt();
++ }
++ catch (Exception e)
++ {
++ exception.set(e);
++ }
++ finally
++ {
++ synchronized (runningThreads)
++ {
++ runningThreads.decrementAndGet();
++ runningThreads.notifyAll();
++ }
++ }
++ }
++ synchronized (runningThreads)
++ {
++ if (exception.get() == null && (runningThreads.get() > 0))
++ {
++ try
++ {
++ runningThreads.wait();
++ }
++ catch (InterruptedException e)
++ {
++ Thread.currentThread().interrupt();
++ }
++ }
++ else
++ {
++ break;
++ }
++ }
++ }
++ endSignal.countDown();
++ }
++ };
++
++ /**
++ * Default constructor
++ * @param node
++ * the current NodeState.
++ * @param stateMgr
++ * the shared item state manager.
++ */
++ public MultithreadedIndexing(final NodeData node, final ItemDataConsumer stateMgr)
++ {
++ tasks.offer(new Callable<Void>()
++ {
++ public Void call() throws Exception
++ {
++ createIndex(tasks, node, stateMgr, count);
++ return null;
++ }
++ });
++ }
++
++ /**
++ * Launches the indexing
++ * @param asynchronous indicates whether or not the current thread needs to wait until the
++ * end of the indexing
++ * @return the total amount of nodes that have been indexed. <code>-1</code> in case of an
++ * asynchronous indexing
++ * @throws IOException
++ * if an error occurs while writing to the index.
++ * @throws ItemStateException
++ * if an node state cannot be found.
++ * @throws RepositoryException
++ * if any other error occurs
++ */
++ public long launch(boolean asynchronous) throws IOException, RepositoryException
++ {
++ startThreads();
++ if (!asynchronous)
++ {
++ try
++ {
++ endSignal.await();
++ if (exception.get() != null)
++ {
++ if (exception.get() instanceof IOException)
++ {
++ throw (IOException)exception.get();
++ }
++ else if (exception.get() instanceof RepositoryException)
++ {
++ throw (RepositoryException)exception.get();
++ }
++ else
++ {
++ throw new RuntimeException("Error while indexing", exception.get());
++ }
++ }
++ return count.get();
++ }
++ catch (InterruptedException e)
++ {
++ Thread.currentThread().interrupt();
++ }
++ }
++ return -1L;
++ }
++
++ /**
++ * Starts all the indexing threads
++ */
++ private void startThreads()
++ {
++ for (int i=0; i < nThreads; i++)
++ {
++ (new Thread(indexingTask, "Indexing Thread #" + (i + 1))).start();
++ }
++ }
++ }
+ }
\ No newline at end of file
More information about the exo-jcr-commits
mailing list