[exo-jcr-commits] exo-jcr SVN: r4424 - in jcr/branches/1.12.x: patch/1.12.8-GA and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 24 07:00:53 EDT 2011


Author: paristote
Date: 2011-05-24 07:00:53 -0400 (Tue, 24 May 2011)
New Revision: 4424

Added:
   jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/
   jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt
Removed:
   jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/
Modified:
   jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
Log:
JCR-1572

What is the problem to fix?
* Tested EPP 5.0.1 with 19 000 users created thanks to crash and its advanced command addusers that you can found here https://github.com/exoplatform/gatein-stuff. The table JCR_SITEM contains 4,3 Millions of rows and JCR_SVALUE contains 3,2 Millions of rows, in this case if I try to re-index my content that contains 1 216 000 nodes, the re-indexing takes 3 h 40 minutes on my laptop which is a dual core. The idea of my patch proposal is to propose a multi-threaded re-indexing mechanism in order to take advantage of multi-cores which is commonly used now, thanks to this patch the re-indexing takes 2 h 20 minutes on my laptop knowing that the main bottelneck is the db.
  Please check the patch and if it is ok ask the QA to load a big table and re-index it with and without the patch to see the gain in their environment.

How is the problem fixed?

* involve several threads in jcr indexing operation



Modified: jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-05-24 10:44:25 UTC (rev 4423)
+++ jcr/branches/1.12.x/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-05-24 11:00:53 UTC (rev 4424)
@@ -43,9 +43,16 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jcr.ItemNotFoundException;
 import javax.jcr.RepositoryException;
@@ -214,6 +221,11 @@
    private boolean reindexing = false;
 
    /**
+    * Flag indicating whether the index is stopped.
+    */
+   private volatile boolean stopped;
+   
+   /**
     * The index format version of this multi index.
     */
    private final IndexFormatVersion version;
@@ -314,6 +326,14 @@
          setReadWrite();
       }
       this.indexNames.setMultiIndex(this);
+      // Add a hook that will stop the threads if they are still running
+      Runtime.getRuntime().addShutdownHook(new Thread()
+      {
+         public void run()
+         {
+            stopped = true;
+         }
+      });
    }
 
    /**
@@ -374,11 +394,10 @@
          reindexing = true;
          try
          {
-            long count = 0;
             // traverse and index workspace
             executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
             // NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
-            count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
+            long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
             executeAndLog(new Commit(getTransactionId()));
             log.info("Created initial index for {} nodes", new Long(count));
             releaseMultiReader();
@@ -958,6 +977,7 @@
             }
          }
       }
+      this.stopped = true;
    }
 
    /**
@@ -1304,44 +1324,118 @@
     * @throws RepositoryException
     *             if any other error occurs
     */
-   private long createIndex(NodeData node, ItemDataConsumer stateMgr, long count) throws IOException,
+   private long createIndex(NodeData node, ItemDataConsumer stateMgr) throws IOException,
       RepositoryException
    {
+      MultithreadedIndexing indexing = new MultithreadedIndexing(node, stateMgr);
+      return indexing.launch(false);
+   }
+   
+   /**
+    * Recursively creates an index starting with the NodeState
+    * <code>node</code>.
+    * 
+    * @pamam tasks
+    *            the queue of existing indexing tasks 
+    * @param node
+    *            the current NodeState.
+    * @param stateMgr
+    *            the shared item state manager.
+    * @param count
+    *            the number of nodes already indexed.
+    * @throws IOException
+    *             if an error occurs while writing to the index.
+    * @throws ItemStateException
+    *             if an node state cannot be found.
+    * @throws RepositoryException
+    *             if any other error occurs
+    * @throws InterruptedException
+    *             if the task has been interrupted 
+    */
+   private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node, final ItemDataConsumer stateMgr, final AtomicLong count) throws IOException,
+      RepositoryException, InterruptedException
+   {
+      if (stopped)
+      {
+         throw new InterruptedException();
+      }
       // NodeId id = node.getNodeId();
 
       if (indexingTree.isExcluded(node))
       {
-         return count;
+         return;
       }
-      executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
-      if (++count % 100 == 0)
+      executeAndLog(new AddNode(getTransactionId(), node.getIdentifier(), true));
+      if (count.incrementAndGet() % 1000 == 0)
       {
-
-         log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
+         log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
       }
-      if (count % 10 == 0)
+      synchronized (this)
       {
-         checkIndexingQueue(true);
+         if (count.get() % 10 == 0)
+         {
+            checkIndexingQueue(true);
+         }
+         checkVolatileCommit();
       }
-      checkVolatileCommit();
       List<NodeData> children = stateMgr.getChildNodesData(node);
-      for (NodeData nodeData : children)
+      for (final NodeData nodeData : children)
       {
-
-         NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
-         if (childState == null)
+         Callable<Void> task = new Callable<Void>()
          {
-            handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
-               new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
-         }
+            public Void call() throws Exception
+            {
+               createIndex(tasks, node, stateMgr, count, nodeData);
+               return null;
+            }
 
-         if (nodeData != null)
+         };
+         if (!tasks.offer(task))
          {
-            count = createIndex(nodeData, stateMgr, count);
+            // All threads have tasks to do so we do it ourself
+            createIndex(tasks, node, stateMgr, count, nodeData);
          }
       }
+   }
+   
+   /**
+    * Recursively creates an index starting with the NodeState
+    * <code>node</code>.
+    * 
+    * @pamam tasks
+    *            the queue of existing indexing tasks 
+    * @param node
+    *            the current NodeState.
+    * @param stateMgr
+    *            the shared item state manager.
+    * @param count
+    *            the number of nodes already indexed.
+    * @param nodeData
+    *            the node data to index.
+    * @throws IOException
+    *             if an error occurs while writing to the index.
+    * @throws ItemStateException
+    *             if an node state cannot be found.
+    * @throws RepositoryException
+    *             if any other error occurs
+    * @throws InterruptedException
+    *             if the task has been interrupted 
+    */
+   private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node,
+      final ItemDataConsumer stateMgr, final AtomicLong count, final NodeData nodeData)
+      throws RepositoryException, IOException, InterruptedException
+   {
+      NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
+      if (childState == null)
+      {
+         handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
+            new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
+      }
 
-      return count;
+      if (nodeData != null)
+      {
+         createIndex(tasks, nodeData, stateMgr, count);
+      }
    }
 
    /**
@@ -1857,6 +1951,8 @@
        */
       private Document doc;
 
+      private boolean synch;
+      
       /**
        * Creates a new AddNode action.
        * 
@@ -1867,8 +1963,22 @@
        */
       AddNode(long transactionId, String uuid)
       {
+         this(transactionId, uuid, false);
+      }
+
+      /**
+       * Creates a new AddNode action.
+       * 
+       * @param transactionId
+       *            the id of the transaction that executes this action.
+       * @param uuid
+       *            the uuid of the node to add.
+       */
+      AddNode(long transactionId, String uuid, boolean synch)
+      {
          super(transactionId, Action.TYPE_ADD_NODE);
          this.uuid = uuid;
+         this.synch = synch;
       }
 
       /**
@@ -1928,7 +2038,17 @@
          }
          if (doc != null)
          {
-            index.volatileIndex.addDocuments(new Document[]{doc});
+            if (synch)
+            {
+               synchronized (index)
+               {
+                  index.volatileIndex.addDocuments(new Document[]{doc});               
+               }               
+            }
+            else
+            {
+               index.volatileIndex.addDocuments(new Document[]{doc});               
+            }
          }
       }
 
@@ -2540,7 +2660,206 @@
          }
       }
    }
+   
+   /**
+    * This class is used to index a node and its descendants nodes with several threads 
+    */
+   private class MultithreadedIndexing
+   {
+      /**
+       * This instance of {@link AtomicReference} will contain the exception meet if any exception has occurred
+       */
+      private final AtomicReference<Exception> exception = new AtomicReference<Exception>();
+      
+      /**
+       * The total amount of threads used for the indexing
+       */
+      private final int nThreads = Runtime.getRuntime().availableProcessors();
+      
+      /**
+       * The {@link CountDownLatch} used to notify that the indexing is over
+       */
+      private final CountDownLatch endSignal = new CountDownLatch(nThreads);
+      
+      /**
+       * The total amount of threads currently working
+       */
+      private final AtomicInteger runningThreads = new AtomicInteger();
+      
+      /**
+       * The total amount of nodes already indexed
+       */
+      private final AtomicLong count = new AtomicLong();
+      
+      /**
+       * The list of indexing tasks left to do
+       */
+      private final Queue<Callable<Void>> tasks = new LinkedBlockingQueue<Callable<Void>>(nThreads)
+      {
+         private static final long serialVersionUID = 1L;
 
+         @Override
+         public Callable<Void> poll()
+         {
+            Callable<Void> task;
+            synchronized (runningThreads)
+            {
+               if ((task = super.poll()) != null)
+               {
+                  runningThreads.incrementAndGet();                  
+               }
+            }
+            return task;
+         }
+
+         @Override
+         public boolean offer(Callable<Void> o)
+         {            
+            if (super.offer(o))
+            {
+               synchronized (runningThreads)
+               {
+                  runningThreads.notifyAll();
+               }
+               return true;
+            }
+            return false;
+         }         
+      };
+      
+      /**
+       * The task that all the indexing threads have to execute
+       */
+      private final Runnable indexingTask = new Runnable()
+      {
+         public void run()
+         {
+            while (exception.get() == null)
+            {
+               Callable<Void> task;
+               while (exception.get() == null && (task = tasks.poll()) != null)
+               {
+                  try
+                  {
+                     task.call();
+                  }
+                  catch (InterruptedException e)
+                  {
+                     Thread.currentThread().interrupt();
+                  }
+                  catch (Exception e)
+                  {
+                     exception.set(e);
+                  }
+                  finally
+                  {
+                     synchronized (runningThreads)
+                     {
+                        runningThreads.decrementAndGet();
+                        runningThreads.notifyAll();
+                     }
+                  }
+               }
+               synchronized (runningThreads)
+               {
+                  if (exception.get() == null && (runningThreads.get() > 0))
+                  {
+                     try
+                     {
+                        runningThreads.wait();
+                     }
+                     catch (InterruptedException e)
+                     {
+                        Thread.currentThread().interrupt();
+                     }
+                  }
+                  else
+                  {
+                     break;
+                  }
+               }
+            }
+            endSignal.countDown();
+         }
+      };
+      
+      /**
+       * Default constructor
+       * @param node
+       *            the current NodeState.
+       * @param stateMgr
+       *            the shared item state manager.
+       */
+      public MultithreadedIndexing(final NodeData node, final ItemDataConsumer stateMgr)
+      {
+         tasks.offer(new Callable<Void>()
+         {
+            public Void call() throws Exception
+            {
+               createIndex(tasks, node, stateMgr, count);
+               return null;
+            }
+         });
+      }
+      
+      /**
+       * Launches the indexing
+       * @param asynchronous indicates whether or not the current thread needs to wait until the 
+       * end of the indexing
+       * @return the total amount of nodes that have been indexed. <code>-1</code> in case of an
+       * asynchronous indexing
+       * @throws IOException
+       *             if an error occurs while writing to the index.
+       * @throws ItemStateException
+       *             if an node state cannot be found.
+       * @throws RepositoryException
+       *             if any other error occurs
+       */
+      public long launch(boolean asynchronous) throws IOException, RepositoryException
+      {
+         startThreads();
+         if (!asynchronous)
+         {
+            try
+            {
+               endSignal.await();
+               if (exception.get() != null)
+               {
+                  if (exception.get() instanceof IOException)
+                  {
+                     throw (IOException)exception.get();
+                  }
+                  else if (exception.get() instanceof RepositoryException)
+                  {
+                     throw (RepositoryException)exception.get();
+                  }
+                  else
+                  {
+                     throw new RuntimeException("Error while indexing", exception.get());
+                  }
+               }
+               return count.get();
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+         return -1L;
+      }
+
+      /**
+       * Starts all the indexing threads
+       */
+      private void startThreads()
+      {
+         for (int i=0; i < nThreads; i++)
+         {
+            (new Thread(indexingTask, "Indexing Thread #" + (i + 1))).start();
+         }
+      }
+   }
+
    /**
     * @see org.exoplatform.services.jcr.impl.core.query.lucene.IndexUpdateMonitorListener#onUpdateInProgressChange(boolean)
     */

Added: jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt
===================================================================
--- jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt	                        (rev 0)
+++ jcr/branches/1.12.x/patch/1.12.9-GA/JCR-1572/readme.txt	2011-05-24 11:00:53 UTC (rev 4424)
@@ -0,0 +1,65 @@
+Summary
+
+    Status: Improve the re-indexing mechanism to take advantage of multi-cores
+    CCP Issue: N/A, Product Jira Issue: JCR-1572.
+    Complexity: Medium
+
+The Proposal
+Problem description
+
+What is the problem to fix?
+Tested EPP 5.0.1 with 19 000 users created thanks to crash and its advanced command addusers that you can found here https://github.com/exoplatform/gatein-stuff. The table JCR_SITEM contains 4,3 Millions of rows and JCR_SVALUE contains 3,2 Millions of rows, in this case if I try to re-index my content that contains 1 216 000 nodes, the re-indexing takes 3 h 40 minutes on my laptop which is a dual core. The idea of my patch proposal is to propose a multi-threaded re-indexing mechanism in order to take advantage of multi-cores which is commonly used now, thanks to this patch the re-indexing takes 2 h 20 minutes on my laptop knowing that the main bottelneck is the db.
+
+Please check the patch and if it is ok ask the QA to load a big table and re-index it with and without the patch to see the gain in their environment.
+Fix description
+
+How is the problem fixed?
+
+* involve several threads in jcr indexing operation* *
+
+Patch information:
+Patch files: JCR-1572.patch
+
+Tests to perform
+
+Reproduction test
+* No
+
+Tests performed at DevLevel
+* functional testing in JCR project
+
+Tests performed at QA/Support Level
+*
+
+Documentation changes
+
+Documentation changes:
+* No
+
+Configuration changes
+
+Configuration changes:
+* No
+
+Will previous configuration continue to work?
+* Yes
+
+Risks and impacts
+
+Can this bug fix have any side effects on current client projects?
+    No
+
+Is there a performance risk/cost?
+* No
+
+Validation (PM/Support/QA)
+
+PM Comment
+* PL review: patch validated
+
+Support Comment
+* Support review: patch validated
+
+QA Feedbacks
+*
+



More information about the exo-jcr-commits mailing list