[exo-jcr-commits] exo-jcr SVN: r3939 - in jcr/branches/1.12.x/patch/1.12.8-GA: JCR-1572 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Feb 8 03:30:07 EST 2011


Author: tolusha
Date: 2011-02-08 03:30:07 -0500 (Tue, 08 Feb 2011)
New Revision: 3939

Added:
   jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/
   jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch
Log:
JCR-1572: patch proposed

Added: jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch
===================================================================
--- jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch	                        (rev 0)
+++ jcr/branches/1.12.x/patch/1.12.8-GA/JCR-1572/JCR-1572.patch	2011-02-08 08:30:07 UTC (rev 3939)
@@ -0,0 +1,462 @@
+Index: exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
+===================================================================
+--- exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	(revision 3789)
++++ exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	(working copy)
+@@ -43,9 +43,16 @@
+ import java.util.Iterator;
+ import java.util.List;
+ import java.util.Map;
++import java.util.Queue;
+ import java.util.Set;
+ import java.util.Timer;
+ import java.util.TimerTask;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.LinkedBlockingQueue;
++import java.util.concurrent.atomic.AtomicInteger;
++import java.util.concurrent.atomic.AtomicLong;
++import java.util.concurrent.atomic.AtomicReference;
+ 
+ import javax.jcr.ItemNotFoundException;
+ import javax.jcr.RepositoryException;
+@@ -214,6 +221,11 @@
+    private boolean reindexing = false;
+ 
+    /**
++    * Flag indicating whether the index is stopped.
++    */
++   private volatile boolean stopped;
++   
++   /**
+     * The index format version of this multi index.
+     */
+    private final IndexFormatVersion version;
+@@ -314,6 +326,14 @@
+          setReadWrite();
+       }
+       this.indexNames.setMultiIndex(this);
++      // Add a hook that will stop the threads if they are still running
++      Runtime.getRuntime().addShutdownHook(new Thread()
++      {
++         public void run()
++         {
++            stopped = true;
++         }
++      });
+    }
+ 
+    /**
+@@ -374,11 +394,10 @@
+          reindexing = true;
+          try
+          {
+-            long count = 0;
+             // traverse and index workspace
+             executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+             // NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
+-            count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
++            long count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
+             executeAndLog(new Commit(getTransactionId()));
+             log.info("Created initial index for {} nodes", new Long(count));
+             releaseMultiReader();
+@@ -951,6 +970,7 @@
+             }
+          }
+       }
++      this.stopped = true;
+    }
+ 
+    /**
+@@ -1297,44 +1317,118 @@
+     * @throws RepositoryException
+     *             if any other error occurs
+     */
+-   private long createIndex(NodeData node, ItemDataConsumer stateMgr, long count) throws IOException,
++   private long createIndex(NodeData node, ItemDataConsumer stateMgr) throws IOException,
+       RepositoryException
+    {
++      MultithreadedIndexing indexing = new MultithreadedIndexing(node, stateMgr);
++      return indexing.launch(false);
++   }
++   
++   /**
++    * Recursively creates an index starting with the NodeState
++    * <code>node</code>.
++    * 
++    * @pamam tasks
++    *            the queue of existing indexing tasks 
++    * @param node
++    *            the current NodeState.
++    * @param stateMgr
++    *            the shared item state manager.
++    * @param count
++    *            the number of nodes already indexed.
++    * @throws IOException
++    *             if an error occurs while writing to the index.
++    * @throws ItemStateException
++    *             if an node state cannot be found.
++    * @throws RepositoryException
++    *             if any other error occurs
++    * @throws InterruptedException
++    *             if the task has been interrupted 
++    */
++   private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node, final ItemDataConsumer stateMgr, final AtomicLong count) throws IOException,
++      RepositoryException, InterruptedException
++   {
++      if (stopped)
++      {
++         throw new InterruptedException();
++      }
+       // NodeId id = node.getNodeId();
+ 
+       if (indexingTree.isExcluded(node))
+       {
+-         return count;
++         return;
+       }
+-      executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+-      if (++count % 100 == 0)
++      executeAndLog(new AddNode(getTransactionId(), node.getIdentifier(), true));
++      if (count.incrementAndGet() % 1000 == 0)
+       {
+-
+-         log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count));
++         log.info("indexing... {} ({})", node.getQPath().getAsString(), new Long(count.get()));
+       }
+-      if (count % 10 == 0)
++      synchronized (this)
+       {
+-         checkIndexingQueue(true);
++         if (count.get() % 10 == 0)
++         {
++            checkIndexingQueue(true);
++         }
++         checkVolatileCommit();
+       }
+-      checkVolatileCommit();
+       List<NodeData> children = stateMgr.getChildNodesData(node);
+-      for (NodeData nodeData : children)
++      for (final NodeData nodeData : children)
+       {
+-
+-         NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
+-         if (childState == null)
++         Callable<Void> task = new Callable<Void>()
+          {
+-            handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
+-               new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
+-         }
++            public Void call() throws Exception
++            {
++               createIndex(tasks, node, stateMgr, count, nodeData);
++               return null;
++            }
+ 
+-         if (nodeData != null)
++         };
++         if (!tasks.offer(task))
+          {
+-            count = createIndex(nodeData, stateMgr, count);
++            // All threads have tasks to do so we do it ourself
++            createIndex(tasks, node, stateMgr, count, nodeData);
+          }
+       }
++   }
++   
++   /**
++    * Recursively creates an index starting with the NodeState
++    * <code>node</code>.
++    * 
++    * @pamam tasks
++    *            the queue of existing indexing tasks 
++    * @param node
++    *            the current NodeState.
++    * @param stateMgr
++    *            the shared item state manager.
++    * @param count
++    *            the number of nodes already indexed.
++    * @param nodeData
++    *            the node data to index.
++    * @throws IOException
++    *             if an error occurs while writing to the index.
++    * @throws ItemStateException
++    *             if an node state cannot be found.
++    * @throws RepositoryException
++    *             if any other error occurs
++    * @throws InterruptedException
++    *             if the task has been interrupted 
++    */
++   private void createIndex(final Queue<Callable<Void>> tasks, final NodeData node,
++      final ItemDataConsumer stateMgr, final AtomicLong count, final NodeData nodeData)
++      throws RepositoryException, IOException, InterruptedException
++   {
++      NodeData childState = (NodeData)stateMgr.getItemData(nodeData.getIdentifier());
++      if (childState == null)
++      {
++         handler.getOnWorkspaceInconsistencyHandler().handleMissingChildNode(
++            new ItemNotFoundException("Child not found "), handler, nodeData.getQPath(), node, nodeData);
++      }
+ 
+-      return count;
++      if (nodeData != null)
++      {
++         createIndex(tasks, nodeData, stateMgr, count);
++      }
+    }
+ 
+    /**
+@@ -1850,6 +1944,8 @@
+        */
+       private Document doc;
+ 
++      private boolean synch;
++      
+       /**
+        * Creates a new AddNode action.
+        * 
+@@ -1860,8 +1956,22 @@
+        */
+       AddNode(long transactionId, String uuid)
+       {
++         this(transactionId, uuid, false);
++      }
++
++      /**
++       * Creates a new AddNode action.
++       * 
++       * @param transactionId
++       *            the id of the transaction that executes this action.
++       * @param uuid
++       *            the uuid of the node to add.
++       */
++      AddNode(long transactionId, String uuid, boolean synch)
++      {
+          super(transactionId, Action.TYPE_ADD_NODE);
+          this.uuid = uuid;
++         this.synch = synch;
+       }
+ 
+       /**
+@@ -1921,7 +2031,17 @@
+          }
+          if (doc != null)
+          {
+-            index.volatileIndex.addDocuments(new Document[]{doc});
++            if (synch)
++            {
++               synchronized (index)
++               {
++                  index.volatileIndex.addDocuments(new Document[]{doc});               
++               }               
++            }
++            else
++            {
++               index.volatileIndex.addDocuments(new Document[]{doc});               
++            }
+          }
+       }
+ 
+@@ -2559,4 +2679,203 @@
+          }
+       }
+    }
++   
++   /**
++    * This class is used to index a node and its descendants nodes with several threads 
++    */
++   private class MultithreadedIndexing
++   {
++      /**
++       * This instance of {@link AtomicReference} will contain the exception meet if any exception has occurred
++       */
++      private final AtomicReference<Exception> exception = new AtomicReference<Exception>();
++      
++      /**
++       * The total amount of threads used for the indexing
++       */
++      private final int nThreads = Runtime.getRuntime().availableProcessors();
++      
++      /**
++       * The {@link CountDownLatch} used to notify that the indexing is over
++       */
++      private final CountDownLatch endSignal = new CountDownLatch(nThreads);
++      
++      /**
++       * The total amount of threads currently working
++       */
++      private final AtomicInteger runningThreads = new AtomicInteger();
++      
++      /**
++       * The total amount of nodes already indexed
++       */
++      private final AtomicLong count = new AtomicLong();
++      
++      /**
++       * The list of indexing tasks left to do
++       */
++      private final Queue<Callable<Void>> tasks = new LinkedBlockingQueue<Callable<Void>>(nThreads)
++      {
++         private static final long serialVersionUID = 1L;
++
++         @Override
++         public Callable<Void> poll()
++         {
++            Callable<Void> task;
++            synchronized (runningThreads)
++            {
++               if ((task = super.poll()) != null)
++               {
++                  runningThreads.incrementAndGet();                  
++               }
++            }
++            return task;
++         }
++
++         @Override
++         public boolean offer(Callable<Void> o)
++         {            
++            if (super.offer(o))
++            {
++               synchronized (runningThreads)
++               {
++                  runningThreads.notifyAll();
++               }
++               return true;
++            }
++            return false;
++         }         
++      };
++      
++      /**
++       * The task that all the indexing threads have to execute
++       */
++      private final Runnable indexingTask = new Runnable()
++      {
++         public void run()
++         {
++            while (exception.get() == null)
++            {
++               Callable<Void> task;
++               while (exception.get() == null && (task = tasks.poll()) != null)
++               {
++                  try
++                  {
++                     task.call();
++                  }
++                  catch (InterruptedException e)
++                  {
++                     Thread.currentThread().interrupt();
++                  }
++                  catch (Exception e)
++                  {
++                     exception.set(e);
++                  }
++                  finally
++                  {
++                     synchronized (runningThreads)
++                     {
++                        runningThreads.decrementAndGet();
++                        runningThreads.notifyAll();
++                     }
++                  }
++               }
++               synchronized (runningThreads)
++               {
++                  if (exception.get() == null && (runningThreads.get() > 0))
++                  {
++                     try
++                     {
++                        runningThreads.wait();
++                     }
++                     catch (InterruptedException e)
++                     {
++                        Thread.currentThread().interrupt();
++                     }
++                  }
++                  else
++                  {
++                     break;
++                  }
++               }
++            }
++            endSignal.countDown();
++         }
++      };
++      
++      /**
++       * Default constructor
++       * @param node
++       *            the current NodeState.
++       * @param stateMgr
++       *            the shared item state manager.
++       */
++      public MultithreadedIndexing(final NodeData node, final ItemDataConsumer stateMgr)
++      {
++         tasks.offer(new Callable<Void>()
++         {
++            public Void call() throws Exception
++            {
++               createIndex(tasks, node, stateMgr, count);
++               return null;
++            }
++         });
++      }
++      
++      /**
++       * Launches the indexing
++       * @param asynchronous indicates whether or not the current thread needs to wait until the 
++       * end of the indexing
++       * @return the total amount of nodes that have been indexed. <code>-1</code> in case of an
++       * asynchronous indexing
++       * @throws IOException
++       *             if an error occurs while writing to the index.
++       * @throws ItemStateException
++       *             if an node state cannot be found.
++       * @throws RepositoryException
++       *             if any other error occurs
++       */
++      public long launch(boolean asynchronous) throws IOException, RepositoryException
++      {
++         startThreads();
++         if (!asynchronous)
++         {
++            try
++            {
++               endSignal.await();
++               if (exception.get() != null)
++               {
++                  if (exception.get() instanceof IOException)
++                  {
++                     throw (IOException)exception.get();
++                  }
++                  else if (exception.get() instanceof RepositoryException)
++                  {
++                     throw (RepositoryException)exception.get();
++                  }
++                  else
++                  {
++                     throw new RuntimeException("Error while indexing", exception.get());
++                  }
++               }
++               return count.get();
++            }
++            catch (InterruptedException e)
++            {
++               Thread.currentThread().interrupt();
++            }
++         }
++         return -1L;
++      }
++
++      /**
++       * Starts all the indexing threads
++       */
++      private void startThreads()
++      {
++         for (int i=0; i < nThreads; i++)
++         {
++            (new Thread(indexingTask, "Indexing Thread #" + (i + 1))).start();
++         }
++      }
++   }
+ }
\ No newline at end of file



More information about the exo-jcr-commits mailing list