[exo-jcr-commits] exo-jcr SVN: r3937 - in jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl: storage/jdbc/indexing and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 7 16:21:49 EST 2011


Author: tolusha
Date: 2011-02-07 16:21:49 -0500 (Mon, 07 Feb 2011)
New Revision: 3937

Modified:
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java
Log:
EXOJCR-1104: RDBMS reindexing is multithreaded

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-02-07 18:54:11 UTC (rev 3936)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-02-07 21:21:49 UTC (rev 3937)
@@ -435,10 +435,10 @@
             // NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
             // check if we have deal with JDBC indexing mechanism
             Indexable indexableComponent = (Indexable)handler.getContext().getContainer().getComponent(Indexable.class);
-            long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
-            //            long count =
-            //               indexableComponent == null ? createIndex(indexingTree.getIndexingRoot(), stateMgr) : createIndex(
-            //                  indexableComponent, indexingTree.getIndexingRoot());
+            long count =
+               indexableComponent == null ? createIndex(indexingTree.getIndexingRoot(), stateMgr) : createIndex(
+                  indexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize()),
+                  indexingTree.getIndexingRoot());
 
             executeAndLog(new Commit(getTransactionId()));
             log.info("Created initial index for {} nodes", new Long(count));
@@ -1533,60 +1533,118 @@
    }
 
    /**
-    * Creates an index.
-    * <code>node</code>.
+    * Create index.
     * 
-    * @param indexableComponent
-    *          the component which responsible for quick indexing
+    * @param iterator
+    *             the NodeDataIndexing iterator            
     * @param rootNode
-    *            the current NodeState.
-    * @param path
-    *            the path of the current node.
+    *            the root node of the index 
+    * @return the total amount of indexed nodes           
+    * @throws IOException
+    *             if an error occurs while writing to the index.
+    * @throws RepositoryException
+    *             if any other error occurs
+    */
+   private long createIndex(NodeDataIndexingIterator iterator, NodeData rootNode) throws IOException,
+      RepositoryException
+   {
+      MultithreadedIndexing indexing = new MultithreadedIndexing(iterator, rootNode);
+      return indexing.launch(false);
+   }
+
+   /**
+    * Create index.
+    * 
+    * @param iterator
+    *             the NodeDataIndexing iterator
+    * @param rootNode
+    *            the root node of the index                          
     * @param count
     *            the number of nodes already indexed.
-    * @return the number of nodes indexed so far.
     * @throws IOException
     *             if an error occurs while writing to the index.
-    * @throws ItemStateException
-    *             if an node state cannot be found.
     * @throws RepositoryException
     *             if any other error occurs
+    * @throws InterruptedException
+    *             if the task has been interrupted 
     */
-   private long createIndex(Indexable indexableComponent, NodeData rootNode, long count) throws IOException,
-      RepositoryException
+   private void createIndex(final NodeDataIndexingIterator iterator, NodeData rootNode, final AtomicLong count)
+      throws RepositoryException, InterruptedException, IOException
    {
-      NodeDataIndexingIterator iterator =
-         indexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize());
-
-      while (iterator.hasNext())
+      for (NodeDataIndexing node : iterator.next())
       {
-         for (NodeDataIndexing node : iterator.next())
+         if (stopped)
          {
-            if (indexingTree.isExcluded(node))
-            {
-               continue;
-            }
+            throw new InterruptedException();
+         }
 
-            if (!node.getQPath().isDescendantOf(rootNode.getQPath()) && !node.getQPath().equals(rootNode.getQPath()))
-            {
-               continue;
-            }
+         if (indexingTree.isExcluded(node))
+         {
+            continue;
+         }
 
-            executeAndLog(new AddNode(getTransactionId(), node, true));
+         if (!node.getQPath().isDescendantOf(rootNode.getQPath()) && !node.getQPath().equals(rootNode.getQPath()))
+         {
+            continue;
+         }
 
-            if (++count % 100 == 0)
+         executeAndLog(new AddNode(getTransactionId(), node, true));
+         if (count.incrementAndGet() % 1000 == 0)
+         {
+            log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
+         }
+
+         synchronized (this)
+         {
+            if (count.get() % 10 == 0)
             {
-               log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
-            }
-            if (count % 10 == 0)
-            {
                checkIndexingQueue(true);
             }
             checkVolatileCommit();
          }
       }
+   }
 
-      return count;
+   /**
+    * Creates an index.
+    * 
+    * @param tasks
+    *            the queue of existing indexing tasks 
+    * @param rootNode
+    *            the root node of the index 
+    * @param iterator
+    *             the NodeDataIndexing iterator            
+    * @param count
+    *            the number of nodes already indexed.
+    * @throws IOException
+    *             if an error occurs while writing to the index.
+    * @throws ItemStateException
+    *             if an node state cannot be found.
+    * @throws RepositoryException
+    *             if any other error occurs
+    * @throws InterruptedException 
+    *             if thread was interrupted
+    */
+   private void createIndex(final Queue<Callable<Void>> tasks, final NodeDataIndexingIterator iterator,
+      final NodeData rootNode, final AtomicLong count) throws IOException, RepositoryException, InterruptedException
+   {
+      while (iterator.hasNext())
+      {
+         Callable<Void> task = new Callable<Void>()
+         {
+            public Void call() throws Exception
+            {
+               createIndex(iterator, rootNode, count);
+               return null;
+            }
+         };
+
+         if (!tasks.offer(task))
+         {
+            // All threads have tasks to do so we do it ourself
+            createIndex(iterator, rootNode, count);
+         }
+      }
    }
 
    /**
@@ -3005,7 +3063,8 @@
       };
 
       /**
-       * Default constructor
+       * MultithreadedIndexing constructor.
+       * 
        * @param node
        *            the current NodeState.
        * @param stateMgr
@@ -3024,6 +3083,26 @@
       }
 
       /**
+       * MultithreadedIndexing constructor.
+       * 
+       * @param node
+       *            the current NodeState.
+       * @param iterator
+       *            NodeDataIndexingIterator
+       */
+      public MultithreadedIndexing(final NodeDataIndexingIterator iterator, final NodeData rootNode)
+      {
+         tasks.offer(new Callable<Void>()
+         {
+            public Void call() throws Exception
+            {
+               createIndex(tasks, iterator, rootNode, count);
+               return null;
+            }
+         });
+      }
+
+      /**
        * Launches the indexing
        * @param asynchronous indicates whether or not the current thread needs to wait until the 
        * end of the indexing

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java	2011-02-07 18:54:11 UTC (rev 3936)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/storage/jdbc/indexing/JdbcNodeDataIndexingIterator.java	2011-02-07 21:21:49 UTC (rev 3937)
@@ -25,7 +25,10 @@
 import org.exoplatform.services.log.ExoLogger;
 import org.exoplatform.services.log.Log;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jcr.RepositoryException;
 
@@ -55,17 +58,17 @@
    /**
     * The current offset in database.
     */
-   private int offset = 0;
+   private AtomicInteger offset = new AtomicInteger(0);
 
    /**
-    * Logger.
+    * Indicates if not all records have been read from database.
     */
-   protected static final Log LOG = ExoLogger.getLogger("exo.jcr.component.core.JdbcIndexingDataIterator");
+   private AtomicBoolean hasNext = new AtomicBoolean(true);
 
    /**
-    * The list of nodes to return in next() method.
+    * Logger.
     */
-   private List<NodeDataIndexing> current;
+   protected static final Log LOG = ExoLogger.getLogger("exo.jcr.component.core.JdbcIndexingDataIterator");
 
    /**
     * Constructor JdbcIndexingDataIterator.
@@ -75,41 +78,24 @@
    {
       this.connFactory = connFactory;
       this.pageSize = pageSize;
-      this.current = readNext();
    }
 
    /**
     * {@inheritDoc}
     */
-   public boolean hasNext()
-   {
-      return this.current.size() != 0;
-   }
-
-   /**
-    * {@inheritDoc}
-    */
    public List<NodeDataIndexing> next() throws RepositoryException
    {
-      List<NodeDataIndexing> next = this.current;
-      this.current = readNext();
+      if (!hasNext())
+      {
+         // avoid unnecessary request to database
+         return new ArrayList<NodeDataIndexing>();
+      }
 
-      return next;
-   }
-
-   /**
-    * Read nodes from database.  
-    * 
-    * @return List<NodeDataIndexing> 
-    * @throws RepositoryException 
-    */
-   private List<NodeDataIndexing> readNext() throws RepositoryException
-   {
       JDBCStorageConnection conn = (JDBCStorageConnection)connFactory.openConnection();
       try
       {
-         List<NodeDataIndexing> result = conn.getNodesAndProperties(offset, pageSize);
-         offset += pageSize;
+         List<NodeDataIndexing> result = conn.getNodesAndProperties(offset.getAndAdd(pageSize), pageSize);
+         hasNext.compareAndSet(true, result.size() == pageSize);
          
          return result;
       }
@@ -118,5 +104,13 @@
          conn.close();
       }
    }
+
+   /**
+    * {@inheritDoc}
+    */
+   public boolean hasNext()
+   {
+      return hasNext.get();
+   }
 }
 



More information about the exo-jcr-commits mailing list