[exo-jcr-commits] exo-jcr SVN: r3263 - jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 8 08:39:12 EDT 2010


Author: nzamosenchuk
Date: 2010-10-08 08:39:11 -0400 (Fri, 08 Oct 2010)
New Revision: 3263

Removed:
   jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java
Modified:
   jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
   jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
Log:
EXOJCR-987 Getting rid of Merger and actions inside MultiIndex

Deleted: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java	2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexMerger.java	2010-10-08 12:39:11 UTC (rev 3263)
@@ -1,572 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.exoplatform.services.jcr.impl.core.query.lucene;
-
-import EDU.oswego.cs.dl.util.concurrent.Mutex;
-import EDU.oswego.cs.dl.util.concurrent.Sync;
-
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUtils;
-import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.Term;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Merges indexes in a separate daemon thread.
- */
-class IndexMerger extends Thread implements IndexListener
-{
-
-   /**
-    * Logger instance for this class.
-    */
-   private static final Logger log = LoggerFactory.getLogger("exo.jcr.component.core.IndexMerger");
-
-   /**
-    * Marker task to signal the background thread to quit.
-    */
-   private static final Merge QUIT = new Merge(new Index[0]);
-
-   /**
-    * minMergeDocs config parameter.
-    */
-   private int minMergeDocs = SearchIndex.DEFAULT_MIN_MERGE_DOCS;
-
-   /**
-    * maxMergeDocs config parameter
-    */
-   private int maxMergeDocs = SearchIndex.DEFAULT_MAX_MERGE_DOCS;
-
-   /**
-    * mergeFactor config parameter
-    */
-   private int mergeFactor = SearchIndex.DEFAULT_MERGE_FACTOR;
-
-   /**
-    * Queue of merge Tasks
-    */
-   private final Buffer mergeTasks = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
-
-   /**
-    * List of id <code>Term</code> that identify documents that were deleted
-    * while a merge was running.
-    */
-   private final List deletedDocuments = Collections.synchronizedList(new ArrayList());
-
-   /**
-    * List of <code>IndexBucket</code>s in ascending document limit.
-    */
-   private final List indexBuckets = new ArrayList();
-
-   /**
-    * The <code>MultiIndex</code> this index merger is working on.
-    */
-   private final MultiIndex multiIndex;
-
-   /**
-    * Monitor object to synchronize merge calculation.
-    */
-   private final Object lock = new Object();
-
-   /**
-    * Mutex that is acquired when replacing indexes on MultiIndex.
-    */
-   private final Sync indexReplacement = new Mutex();
-
-   /**
-    * When released, indicates that this index merger is idle.
-    */
-   private final Sync mergerIdle = new Mutex();
-
-   /**
-    * Creates an <code>IndexMerger</code>.
-    *
-    * @param multiIndex the <code>MultiIndex</code>.
-    */
-   IndexMerger(MultiIndex multiIndex)
-   {
-      this.multiIndex = multiIndex;
-      setName("IndexMerger");
-      setDaemon(true);
-      try
-      {
-         mergerIdle.acquire();
-      }
-      catch (InterruptedException e)
-      {
-         // will never happen, lock is free upon construction
-         throw new InternalError("Unable to acquire mutex after construction");
-      }
-   }
-
-   /**
-    * Informs the index merger that an index was added / created.
-    *
-    * @param name the name of the index.
-    * @param numDocs the number of documents it contains.
-    */
-   void indexAdded(String name, int numDocs)
-   {
-      if (numDocs < 0)
-      {
-         throw new IllegalArgumentException("numDocs must be positive");
-      }
-      // multiple threads may enter this method:
-      // - the background thread of this IndexMerger, when it replaces indexes
-      //   after a successful merge
-      // - a regular thread that updates the workspace
-      //
-      // therefore we have to synchronize this block
-      synchronized (lock)
-      {
-         // initially create buckets
-         if (indexBuckets.size() == 0)
-         {
-            long lower = 0;
-            long upper = minMergeDocs;
-            while (upper < maxMergeDocs)
-            {
-               indexBuckets.add(new IndexBucket(lower, upper, true));
-               lower = upper + 1;
-               upper *= mergeFactor;
-            }
-            // one with upper = maxMergeDocs
-            indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false));
-            // and another one as overflow, just in case...
-            indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Long.MAX_VALUE, false));
-         }
-
-         // put index in bucket
-         IndexBucket bucket = (IndexBucket)indexBuckets.get(indexBuckets.size() - 1);
-         for (int i = 0; i < indexBuckets.size(); i++)
-         {
-            bucket = (IndexBucket)indexBuckets.get(i);
-            if (bucket.fits(numDocs))
-            {
-               break;
-            }
-         }
-         bucket.add(new Index(name, numDocs));
-
-         if (log.isDebugEnabled())
-         {
-            log.debug("index added: name=" + name + ", numDocs=" + numDocs);
-         }
-
-         // if bucket does not allow merge, we don't have to continue
-         if (!bucket.allowsMerge())
-         {
-            return;
-         }
-
-         // check if we need a merge
-         if (bucket.size() >= mergeFactor)
-         {
-            long targetMergeDocs = bucket.upper;
-            targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs);
-            // sum up docs in bucket
-            List indexesToMerge = new ArrayList();
-            int mergeDocs = 0;
-            for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs;)
-            {
-               indexesToMerge.add(it.next());
-            }
-            if (indexesToMerge.size() > 2)
-            {
-               // found merge
-               Index[] idxs = (Index[])indexesToMerge.toArray(new Index[indexesToMerge.size()]);
-               bucket.removeAll(indexesToMerge);
-               if (log.isDebugEnabled())
-               {
-                  log.debug("requesting merge for " + indexesToMerge);
-               }
-               mergeTasks.add(new Merge(idxs));
-               log.debug("merge queue now contains " + mergeTasks.size() + " tasks.");
-            }
-         }
-      }
-   }
-
-   /**
-    * @inheritDoc
-    */
-   public void documentDeleted(Term id)
-   {
-      log.debug("document deleted: " + id.text());
-      deletedDocuments.add(id);
-   }
-
-   /**
-    * When the calling thread returns this index merger will be idle, that is
-    * there will be no merge tasks pending anymore. The method returns immediately
-    * if there are currently no tasks pending at all.
-    */
-   void waitUntilIdle() throws InterruptedException
-   {
-      mergerIdle.acquire();
-      // and immediately release again
-      mergerIdle.release();
-   }
-
-   /**
-    * Signals this <code>IndexMerger</code> to stop and waits until it
-    * has terminated.
-    */
-   void dispose()
-   {
-      log.debug("dispose IndexMerger");
-      // get mutex for index replacements
-      try
-      {
-         indexReplacement.acquire();
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Interrupted while acquiring index replacement sync: " + e);
-         // try to stop IndexMerger without the sync
-      }
-
-      // clear task queue
-      mergeTasks.clear();
-
-      // send quit
-      mergeTasks.add(QUIT);
-      log.debug("quit sent");
-
-      try
-      {
-         // give the merger thread some time to quit,
-         // it is possible that the merger is busy working on a large index.
-         // if that is the case we will just ignore it and the daemon will
-         // die without being able to finish the merge.
-         // at this point it is not possible anymore to replace indexes
-         // on the MultiIndex because we hold the indexReplacement Sync.
-         this.join(500);
-         if (isAlive())
-         {
-            log.info("Unable to stop IndexMerger. Daemon is busy.");
-         }
-         else
-         {
-            log.debug("IndexMerger thread stopped");
-         }
-         log.debug("merge queue size: " + mergeTasks.size());
-      }
-      catch (InterruptedException e)
-      {
-         log.warn("Interrupted while waiting for IndexMerger thread to terminate.");
-      }
-   }
-
-   /**
-    * Implements the index merging.
-    */
-   public void run()
-   {
-      for (;;)
-      {
-         boolean isIdle = false;
-         if (mergeTasks.size() == 0)
-         {
-            mergerIdle.release();
-            isIdle = true;
-         }
-         Merge task = (Merge)mergeTasks.remove();
-         if (task == QUIT)
-         {
-            mergerIdle.release();
-            break;
-         }
-         if (isIdle)
-         {
-            try
-            {
-               mergerIdle.acquire();
-            }
-            catch (InterruptedException e)
-            {
-               Thread.interrupted();
-               log.warn("Unable to acquire mergerIdle sync");
-            }
-         }
-
-         log.debug("accepted merge request");
-
-         // reset deleted documents
-         deletedDocuments.clear();
-
-         // get readers
-         String[] names = new String[task.indexes.length];
-         for (int i = 0; i < task.indexes.length; i++)
-         {
-            names[i] = task.indexes[i].name;
-         }
-         try
-         {
-            log.debug("create new index");
-            PersistentIndex index = multiIndex.getOrCreateIndex(null);
-            boolean success = false;
-            try
-            {
-
-               log.debug("get index readers from MultiIndex");
-               IndexReader[] readers = multiIndex.getIndexReaders(names, this);
-               try
-               {
-                  // do the merge
-                  long time = System.currentTimeMillis();
-                  index.addIndexes(readers);
-                  time = System.currentTimeMillis() - time;
-                  int docCount = 0;
-                  for (int i = 0; i < readers.length; i++)
-                  {
-                     docCount += readers[i].numDocs();
-                  }
-                  log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + ".");
-               }
-               finally
-               {
-                  for (int i = 0; i < readers.length; i++)
-                  {
-                     try
-                     {
-                        Util.closeOrRelease(readers[i]);
-                     }
-                     catch (IOException e)
-                     {
-                        log.warn("Unable to close IndexReader: " + e);
-                     }
-                  }
-               }
-
-               // inform multi index
-               // if we cannot get the sync immediately we have to quit
-               if (!indexReplacement.attempt(0))
-               {
-                  log.debug("index merging canceled");
-                  break;
-               }
-               try
-               {
-                  log.debug("replace indexes");
-                  multiIndex.replaceIndexes(names, index, deletedDocuments);
-               }
-               finally
-               {
-                  indexReplacement.release();
-               }
-
-               success = true;
-
-            }
-            finally
-            {
-               if (!success)
-               {
-                  // delete index
-                  log.debug("deleting index " + index.getName());
-                  multiIndex.deleteIndex(index);
-               }
-            }
-         }
-         catch (Throwable e)
-         {
-            log.error("Error while merging indexes: ", e);
-         }
-      }
-      log.info("IndexMerger terminated");
-   }
-
-   //-----------------------< merge properties >-------------------------------
-
-   /**
-    * The merge factor.
-    */
-   public void setMergeFactor(int mergeFactor)
-   {
-      this.mergeFactor = mergeFactor;
-   }
-
-   /**
-    * The initial threshold for number of documents to merge to a new index.
-    */
-   public void setMinMergeDocs(int minMergeDocs)
-   {
-      this.minMergeDocs = minMergeDocs;
-   }
-
-   /**
-    * The maximum number of document to merge.
-    */
-   public void setMaxMergeDocs(int maxMergeDocs)
-   {
-      this.maxMergeDocs = maxMergeDocs;
-   }
-
-   //------------------------------< internal >--------------------------------
-
-   /**
-    * Implements a simple struct that holds the name of an index and how
-    * many document it contains. <code>Index</code> is comparable using the
-    * number of documents it contains.
-    */
-   private static final class Index implements Comparable
-   {
-
-      /**
-       * The name of the index.
-       */
-      private final String name;
-
-      /**
-       * The number of documents the index contains.
-       */
-      private final int numDocs;
-
-      /**
-       * Creates a new index struct.
-       *
-       * @param name name of an index.
-       * @param numDocs number of documents it contains.
-       */
-      Index(String name, int numDocs)
-      {
-         this.name = name;
-         this.numDocs = numDocs;
-      }
-
-      /**
-       * Indexes are first ordered by {@link #numDocs} and then by {@link
-       * #name}.
-       *
-       * @param o the other <code>Index</code>.
-       * @return a negative integer, zero, or a positive integer as this
-       *         Index is less than, equal to, or greater than the specified
-       *         Index.
-       */
-      public int compareTo(Object o)
-      {
-         Index other = (Index)o;
-         int val = numDocs < other.numDocs ? -1 : (numDocs == other.numDocs ? 0 : 1);
-         if (val != 0)
-         {
-            return val;
-         }
-         else
-         {
-            return name.compareTo(other.name);
-         }
-      }
-
-      /**
-       * @inheritDoc
-       */
-      public String toString()
-      {
-         return name + ":" + numDocs;
-      }
-   }
-
-   /**
-    * Defines a merge task, to merge a couple of indexes into a new index.
-    */
-   private static final class Merge
-   {
-
-      private final Index[] indexes;
-
-      /**
-       * Merge task, to merge <code>indexes</code> into a new index with
-       * <code>name</code>.
-       *
-       * @param indexes the indexes to merge.
-       */
-      Merge(Index[] indexes)
-      {
-         this.indexes = new Index[indexes.length];
-         System.arraycopy(indexes, 0, this.indexes, 0, indexes.length);
-      }
-   }
-
-   /**
-    * Implements a <code>List</code> with a document limit value. An
-    * <code>IndexBucket</code> contains {@link Index}es with documents less
-    * or equal the document limit of the bucket.
-    */
-   private static final class IndexBucket extends ArrayList
-   {
-
-      /**
-       * The lower document limit.
-       */
-      private final long lower;
-
-      /**
-       * The upper document limit.
-       */
-      private final long upper;
-
-      /**
-       * Flag indicating if indexes in this bucket can be merged.
-       */
-      private final boolean allowMerge;
-
-      /**
-       * Creates a new <code>IndexBucket</code>. Limits are both inclusive.
-       *
-       * @param lower document limit.
-       * @param upper document limit.
-       * @param allowMerge if indexes in this bucket can be merged.
-       */
-      IndexBucket(long lower, long upper, boolean allowMerge)
-      {
-         this.lower = lower;
-         this.upper = upper;
-         this.allowMerge = allowMerge;
-      }
-
-      /**
-       * Returns <code>true</code> if the number of documents fit in this
-       * <code>IndexBucket</code>; otherwise <code>false</code>
-       *
-       * @param numDocs the number of documents.
-       * @return <code>true</code> if <code>numDocs</code> fit.
-       */
-      boolean fits(long numDocs)
-      {
-         return numDocs >= lower && numDocs <= upper;
-      }
-
-      /**
-       * Returns <code>true</code> if indexes in this bucket can be merged.
-       *
-       * @return <code>true</code> if indexes in this bucket can be merged.
-       */
-      boolean allowsMerge()
-      {
-         return allowMerge;
-      }
-   }
-}

Modified: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2010-10-08 12:39:11 UTC (rev 3263)
@@ -171,11 +171,6 @@
    private long lastFileSystemFlushTime;
 
    /**
-    * The <code>IndexMerger</code> for this <code>MultiIndex</code>.
-    */
-   private final IndexMerger merger;
-
-   /**
     * Timer to schedule flushes of this index after some idle time.
     */
    private static final Timer FLUSH_TIMER = new Timer(true);
@@ -265,12 +260,6 @@
       // as of 1.5 deletable file is not used anymore
       removeDeletable();
 
-      // initialize IndexMerger
-      merger = new IndexMerger(this);
-      merger.setMaxMergeDocs(handler.getMaxMergeDocs());
-      merger.setMergeFactor(handler.getMergeFactor());
-      merger.setMinMergeDocs(handler.getMinMergeDocs());
-
       // this method is run in privileged mode internally
       IndexingQueueStore store = new IndexingQueueStore(indexDir);
 
@@ -299,7 +288,6 @@
          index.setUseCompoundFile(handler.getUseCompoundFile());
          index.setTermInfosIndexDivisor(handler.getTermInfosIndexDivisor());
          indexes.add(index);
-         merger.indexAdded(index.getName(), index.getNumDocuments());
       }
 
       // init volatile index
@@ -398,10 +386,16 @@
          {
             long count = 0;
             // traverse and index workspace
-            executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+            
+            // TODO: this was removed
+            //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+            
             // NodeData rootState = (NodeData) stateMgr.getItemData(rootId);
             count = createIndex(indexingTree.getIndexingRoot(), stateMgr, count);
-            executeAndLog(new Commit(getTransactionId()));
+            
+            // TODO : this was replaced
+            //executeAndLog(new Commit(getTransactionId()));
+            
             log.info("Created initial index for {} nodes", new Long(count));
             releaseMultiReader();
             scheduleFlushTask();
@@ -467,23 +461,67 @@
             try
             {
                long transactionId = nextTransactionId++;
-               executeAndLog(new Start(transactionId));
+               
+            // TODO: this was removed
+               //executeAndLog(new Start(transactionId));
 
                for (Iterator it = remove.iterator(); it.hasNext();)
                {
-                  executeAndLog(new DeleteNode(transactionId, (String)it.next()));
+                  // TODO this was replaced
+                  //executeAndLog(new DeleteNode(transactionId, (String)it.next()));
+                  
+                  String uuidString = (String)it.next();
+                  // check if indexing queue is still working on
+                  // this node from a previous update
+                  Document doc = indexingQueue.removeDocument(uuidString);
+                  if (doc != null)
+                  {
+                     Util.disposeDocument(doc);
+                  }
+                  Term idTerm = new Term(FieldNames.UUID, uuidString);
+                  // if the document cannot be deleted from the volatile index
+                  // delete it from one of the persistent indexes.
+                  int num = volatileIndex.removeDocument(idTerm);
+                  if (num == 0)
+                  {
+                     for (int i = indexes.size() - 1; i >= 0; i--)
+                     {
+                        // only look in registered indexes
+                        PersistentIndex idx = (PersistentIndex)indexes.get(i);
+                        if (indexNames.contains(idx.getName()))
+                        {
+                           num = idx.removeDocument(idTerm);
+                           if (num > 0)
+                           {
+                              break;
+                           }
+                        }
+                     }
+                  }
+                  
+                  
+                  
                }
                for (Iterator it = add.iterator(); it.hasNext();)
                {
                   Document doc = (Document)it.next();
                   if (doc != null)
                   {
-                     executeAndLog(new AddNode(transactionId, doc));
+                     // TODO: ths is replaced
+                     //executeAndLog(new AddNode(transactionId, doc));
+
+                     if (doc != null)
+                     {
+                        volatileIndex.addDocuments(new Document[]{doc});
+                     }
+                     
                      // commit volatile index if needed
                      flush |= checkVolatileCommit();
                   }
                }
-               executeAndLog(new Commit(transactionId));
+               
+            // TODO : this was replaced
+               //executeAndLog(new Commit(transactionId));
 
                // flush whole index when volatile index has been commited.
                if (flush)
@@ -558,11 +596,15 @@
       try
       {
          Term idTerm = new Term(FieldNames.UUID, uuid.toString());
-         executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+         
+      // TODO: this was removed
+         //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+         
          num = volatileIndex.removeDocument(idTerm);
          if (num > 0)
          {
-            redoLog.append(new DeleteNode(getTransactionId(), uuid));
+            // TODO: removed line
+            //redoLog.append(new DeleteNode(getTransactionId(), uuid));
          }
          for (int i = 0; i < indexes.size(); i++)
          {
@@ -573,12 +615,14 @@
                int removed = index.removeDocument(idTerm);
                if (removed > 0)
                {
-                  redoLog.append(new DeleteNode(getTransactionId(), uuid));
+                  // TODO: removed line
+                  //redoLog.append(new DeleteNode(getTransactionId(), uuid));
                }
                num += removed;
             }
          }
-         executeAndLog(new Commit(getTransactionId()));
+      // TODO : this was replaced
+         //executeAndLog(new Commit(getTransactionId()));
       }
       finally
       {
@@ -792,7 +836,8 @@
             // if we are reindexing there is already an active transaction
             if (!reindexing)
             {
-               executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
+               // TODO: this was removed
+               //executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
             }
             // delete obsolete indexes
             Set names = new HashSet(Arrays.asList(obsoleteIndexes));
@@ -802,16 +847,37 @@
                String indexName = (String)it.next();
                if (indexNames.contains(indexName))
                {
-                  executeAndLog(new DeleteIndex(getTransactionId(), indexName));
+                  // TODO this was replaced
+                  //executeAndLog(new DeleteIndex(getTransactionId(), indexName));
+                  
+                  for (Iterator it2 = indexes.iterator(); it2.hasNext();)
+                  {
+                     PersistentIndex idx = (PersistentIndex)it2.next();
+                     if (idx.getName().equals(indexName))
+                     {
+                        idx.close();
+                        deleteIndex(idx);
+                        break;
+                     }
+                  }
+                  
                }
             }
 
             // Index merger does not log an action when it creates the
             // target
             // index of the merge. We have to do this here.
-            executeAndLog(new CreateIndex(getTransactionId(), index.getName()));
+            // TODO this was replaced
+            ///executeAndLog(new CreateIndex(getTransactionId(), index.getName()));
 
-            executeAndLog(new AddIndex(getTransactionId(), index.getName()));
+            PersistentIndex idx = getOrCreateIndex(index.getName());
+            
+            // TODO: this is replaced
+            //executeAndLog(new AddIndex(getTransactionId(), index.getName()));
+            if (!indexNames.contains(index.getName()))
+            {
+               indexNames.addName(index.getName());
+            }
 
             // delete documents in index
             for (Iterator it = deleted.iterator(); it.hasNext();)
@@ -825,7 +891,9 @@
             {
                // only commit if we are not reindexing
                // when reindexing the final commit is done at the very end
-               executeAndLog(new Commit(getTransactionId()));
+               
+            // TODO : this was replaced
+               //executeAndLog(new Commit(getTransactionId()));
             }
          }
          finally
@@ -954,7 +1022,6 @@
          // stop index merger
          // when calling this method we must not lock this MultiIndex, otherwise
          // a deadlock might occur
-         merger.dispose();
 
          synchronized (this)
          {
@@ -1108,7 +1175,8 @@
       synchronized (this)
       {
          // commit volatile index
-         executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+         // TODO: this was removed
+         //executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
          commitVolatileIndex();
 
          // commit persistent indexes
@@ -1126,11 +1194,17 @@
                // check if index still contains documents
                if (index.getNumDocuments() == 0)
                {
-                  executeAndLog(new DeleteIndex(getTransactionId(), index.getName()));
+                  // TODO THIS was replaced
+                  //executeAndLog(new DeleteIndex(getTransactionId(), index.getName()));
+
+                  index.close();
+                        deleteIndex(index);
+
                }
             }
          }
-         executeAndLog(new Commit(getTransactionId()));
+      // TODO : this was replaced
+         //executeAndLog(new Commit(getTransactionId()));
 
          indexNames.write();
 
@@ -1329,16 +1403,31 @@
 
          long time = System.currentTimeMillis();
          // create index
-         CreateIndex create = new CreateIndex(getTransactionId(), null);
-         executeAndLog(create);
+         
+         //TODO this was replaced
+//         CreateIndex create = new CreateIndex(getTransactionId(), null);
+//         executeAndLog(create);
+         
+         PersistentIndex idx = getOrCreateIndex(null);
 
          // commit volatile index
-         executeAndLog(new VolatileCommit(getTransactionId(), create.getIndexName()));
+         
+         // TODO THIS IS REPLACED
+         //executeAndLog(new VolatileCommit(getTransactionId(), idx.getName()));
 
+         idx.copyIndex(volatileIndex);
+         resetVolatileIndex();
+         
+         // TODO this is replaced
          // add new index
-         AddIndex add = new AddIndex(getTransactionId(), create.getIndexName());
-         executeAndLog(add);
+//         AddIndex add = new AddIndex(getTransactionId(), create.getIndexName());
+//         executeAndLog(add);
 
+         if (!indexNames.contains(idx.getName()))
+         {
+            indexNames.addName(idx.getName());
+         }
+         
          // create new volatile index
          resetVolatileIndex();
 
@@ -1376,7 +1465,11 @@
       {
          return count;
       }
-      executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+      
+      // TODO: this is replaced
+      //executeAndLog(new AddNode(getTransactionId(), node.getIdentifier()));
+      volatileIndex.addDocuments(new Document[]{createDocument(node)});
+      
       if (++count % 100 == 0)
       {
 
@@ -1552,11 +1645,45 @@
             {
                for (Iterator it = finished.keySet().iterator(); it.hasNext();)
                {
-                  executeAndLog(new DeleteNode(getTransactionId(), (String)it.next()));
+                  // TODO this was replaced
+                  //executeAndLog(new DeleteNode(getTransactionId(), (String)it.next()));
+                  
+                  String uuidString = (String)it.next();
+                  // check if indexing queue is still working on
+                  // this node from a previous update
+                  Document doc = indexingQueue.removeDocument(uuidString);
+                  if (doc != null)
+                  {
+                     Util.disposeDocument(doc);
+                  }
+                  Term idTerm = new Term(FieldNames.UUID, uuidString);
+                  // if the document cannot be deleted from the volatile index
+                  // delete it from one of the persistent indexes.
+                  int num = volatileIndex.removeDocument(idTerm);
+                  if (num == 0)
+                  {
+                     for (int i = indexes.size() - 1; i >= 0; i--)
+                     {
+                        // only look in registered indexes
+                        PersistentIndex idx = (PersistentIndex)indexes.get(i);
+                        if (indexNames.contains(idx.getName()))
+                        {
+                           num = idx.removeDocument(idTerm);
+                           if (num > 0)
+                           {
+                              return;
+                           }
+                        }
+                     }
+                  }
+                  
+                  
                }
                for (Iterator it = finished.values().iterator(); it.hasNext();)
                {
-                  executeAndLog(new AddNode(getTransactionId(), (Document)it.next()));
+                  // TODO this was replaced
+                  //executeAndLog(new AddNode(getTransactionId(), (Document)it.next()));
+                  volatileIndex.addDocuments(new Document[]{(Document)it.next()});
                }
             }
             else
@@ -1790,35 +1917,35 @@
          Action a;
          if (actionLabel.equals(Action.ADD_NODE))
          {
-            a = AddNode.fromString(transactionId, arguments);
+            a = AddNode_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.ADD_INDEX))
          {
-            a = AddIndex.fromString(transactionId, arguments);
+            a = AddIndex_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.COMMIT))
          {
-            a = Commit.fromString(transactionId, arguments);
+            a = Commit_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.CREATE_INDEX))
          {
-            a = CreateIndex.fromString(transactionId, arguments);
+            a = CreateIndex_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.DELETE_INDEX))
          {
-            a = DeleteIndex.fromString(transactionId, arguments);
+            a = DeleteIndex_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.DELETE_NODE))
          {
-            a = DeleteNode.fromString(transactionId, arguments);
+            a = DeleteNode_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.START))
          {
-            a = Start.fromString(transactionId, arguments);
+            a = Start_.fromString(transactionId, arguments);
          }
          else if (actionLabel.equals(Action.VOLATILE_COMMIT))
          {
-            a = VolatileCommit.fromString(transactionId, arguments);
+            a = VolatileCommit_.fromString(transactionId, arguments);
          }
          else
          {
@@ -1831,7 +1958,7 @@
    /**
     * Adds an index to the MultiIndex's active persistent index list.
     */
-   private static class AddIndex extends Action
+   private static class AddIndex_ extends Action
    {
 
       /**
@@ -1848,7 +1975,7 @@
        *            the name of the index to add, or <code>null</code> if an
        *            index with a new name should be created.
        */
-      AddIndex(long transactionId, String indexName)
+      AddIndex_(long transactionId, String indexName)
       {
          super(transactionId, Action.TYPE_ADD_INDEX);
          this.indexName = indexName;
@@ -1865,9 +1992,9 @@
        * @throws IllegalArgumentException
        *             if the arguments are malformed.
        */
-      static AddIndex fromString(long transactionId, String arguments)
+      static AddIndex_ fromString(long transactionId, String arguments)
       {
-         return new AddIndex(transactionId, arguments);
+         return new AddIndex_(transactionId, arguments);
       }
 
       /**
@@ -1882,9 +2009,6 @@
          if (!index.indexNames.contains(indexName))
          {
             index.indexNames.addName(indexName);
-            // now that the index is in the active list let the merger know
-            // about it
-            index.merger.indexAdded(indexName, idx.getNumDocuments());
          }
       }
 
@@ -1907,7 +2031,7 @@
    /**
     * Adds a node to the index.
     */
-   private static class AddNode extends Action
+   private static class AddNode_ extends Action
    {
 
       /**
@@ -1935,7 +2059,7 @@
        * @param uuid
        *            the uuid of the node to add.
        */
-      AddNode(long transactionId, String uuid)
+      AddNode_(long transactionId, String uuid)
       {
          super(transactionId, Action.TYPE_ADD_NODE);
          this.uuid = uuid;
@@ -1949,7 +2073,7 @@
        * @param doc
        *            the document to add.
        */
-      AddNode(long transactionId, Document doc)
+      AddNode_(long transactionId, Document doc)
       {
          this(transactionId, doc.get(FieldNames.UUID));
          this.doc = doc;
@@ -1966,14 +2090,14 @@
        * @throws IllegalArgumentException
        *             if the arguments are malformed. Not a UUID.
        */
-      static AddNode fromString(long transactionId, String arguments) throws IllegalArgumentException
+      static AddNode_ fromString(long transactionId, String arguments) throws IllegalArgumentException
       {
          // simple length check
          if (arguments.length() != Constants.UUID_FORMATTED_LENGTH)
          {
             throw new IllegalArgumentException("arguments is not a uuid");
          }
-         return new AddNode(transactionId, arguments);
+         return new AddNode_(transactionId, arguments);
       }
 
       /**
@@ -2021,7 +2145,7 @@
    /**
     * Commits a transaction.
     */
-   private static class Commit extends Action
+   private static class Commit_ extends Action
    {
 
       /**
@@ -2030,7 +2154,7 @@
        * @param transactionId
        *            the id of the transaction that is committed.
        */
-      Commit(long transactionId)
+      Commit_(long transactionId)
       {
          super(transactionId, Action.TYPE_COMMIT);
       }
@@ -2044,9 +2168,9 @@
        *            ignored by this method.
        * @return the Commit action.
        */
-      static Commit fromString(long transactionId, String arguments)
+      static Commit_ fromString(long transactionId, String arguments)
       {
-         return new Commit(transactionId);
+         return new Commit_(transactionId);
       }
 
       /**
@@ -2074,7 +2198,7 @@
     * Creates an new sub index but does not add it to the active persistent
     * index list.
     */
-   private static class CreateIndex extends Action
+   private static class CreateIndex_ extends Action
    {
 
       /**
@@ -2091,7 +2215,7 @@
        *            the name of the index to add, or <code>null</code> if an
        *            index with a new name should be created.
        */
-      CreateIndex(long transactionId, String indexName)
+      CreateIndex_(long transactionId, String indexName)
       {
          super(transactionId, Action.TYPE_CREATE_INDEX);
          this.indexName = indexName;
@@ -2108,10 +2232,10 @@
        * @throws IllegalArgumentException
        *             if the arguments are malformed.
        */
-      static CreateIndex fromString(long transactionId, String arguments)
+      static CreateIndex_ fromString(long transactionId, String arguments)
       {
          // when created from String, this action is executed as redo action
-         return new CreateIndex(transactionId, arguments);
+         return new CreateIndex_(transactionId, arguments);
       }
 
       /**
@@ -2171,7 +2295,7 @@
    /**
     * Closes and deletes an index that is no longer in use.
     */
-   private static class DeleteIndex extends Action
+   private static class DeleteIndex_ extends Action
    {
 
       /**
@@ -2187,7 +2311,7 @@
        * @param indexName
        *            the name of the index to delete.
        */
-      DeleteIndex(long transactionId, String indexName)
+      DeleteIndex_(long transactionId, String indexName)
       {
          super(transactionId, Action.TYPE_DELETE_INDEX);
          this.indexName = indexName;
@@ -2204,9 +2328,9 @@
        * @throws IllegalArgumentException
        *             if the arguments are malformed.
        */
-      static DeleteIndex fromString(long transactionId, String arguments)
+      static DeleteIndex_ fromString(long transactionId, String arguments)
       {
-         return new DeleteIndex(transactionId, arguments);
+         return new DeleteIndex_(transactionId, arguments);
       }
 
       /**
@@ -2249,7 +2373,7 @@
    /**
     * Deletes a node from the index.
     */
-   private static class DeleteNode extends Action
+   private static class DeleteNode_ extends Action
    {
 
       /**
@@ -2271,7 +2395,7 @@
        * @param uuid
        *            the uuid of the node to delete.
        */
-      DeleteNode(long transactionId, String uuid)
+      DeleteNode_(long transactionId, String uuid)
       {
          super(transactionId, Action.TYPE_DELETE_NODE);
          this.uuid = uuid;
@@ -2288,14 +2412,14 @@
        * @throws IllegalArgumentException
        *             if the arguments are malformed. Not a UUID.
        */
-      static DeleteNode fromString(long transactionId, String arguments)
+      static DeleteNode_ fromString(long transactionId, String arguments)
       {
          // simple length check
          if (arguments.length() != Constants.UUID_FORMATTED_LENGTH)
          {
             throw new IllegalArgumentException("arguments is not a uuid");
          }
-         return new DeleteNode(transactionId, arguments);
+         return new DeleteNode_(transactionId, arguments);
       }
 
       /**
@@ -2355,7 +2479,7 @@
    /**
     * Starts a transaction.
     */
-   private static class Start extends Action
+   private static class Start_ extends Action
    {
 
       /**
@@ -2364,7 +2488,7 @@
        * @param transactionId
        *            the id of the transaction that started.
        */
-      Start(long transactionId)
+      Start_(long transactionId)
       {
          super(transactionId, Action.TYPE_START);
       }
@@ -2378,9 +2502,9 @@
        *            ignored by this method.
        * @return the Start action.
        */
-      static Start fromString(long transactionId, String arguments)
+      static Start_ fromString(long transactionId, String arguments)
       {
-         return new Start(transactionId);
+         return new Start_(transactionId);
       }
 
       /**
@@ -2407,7 +2531,7 @@
    /**
     * Commits the volatile index to disk.
     */
-   private static class VolatileCommit extends Action
+   private static class VolatileCommit_ extends Action
    {
 
       /**
@@ -2421,7 +2545,7 @@
        * @param transactionId
        *            the id of the transaction that executes this action.
        */
-      VolatileCommit(long transactionId, String targetIndex)
+      VolatileCommit_(long transactionId, String targetIndex)
       {
          super(transactionId, Action.TYPE_VOLATILE_COMMIT);
          this.targetIndex = targetIndex;
@@ -2436,9 +2560,9 @@
        *            ignored by this implementation.
        * @return the VolatileCommit action.
        */
-      static VolatileCommit fromString(long transactionId, String arguments)
+      static VolatileCommit_ fromString(long transactionId, String arguments)
       {
-         return new VolatileCommit(transactionId, arguments);
+         return new VolatileCommit_(transactionId, arguments);
       }
 
       /**
@@ -2500,7 +2624,6 @@
    protected void setReadOny()
    {
       // try to stop merger in safe way
-      merger.dispose();
       flushTask.cancel();
       FLUSH_TIMER.purge();
       this.redoLog = null;
@@ -2532,18 +2655,8 @@
       attemptDelete();
 
       // now that we are ready, start index merger
-      merger.start();
       if (redoLogApplied)
       {
-         // wait for the index merge to finish pending jobs
-         try
-         {
-            merger.waitUntilIdle();
-         }
-         catch (InterruptedException e)
-         {
-            // move on
-         }
          flush();
       }
 

Modified: jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
===================================================================
--- jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java	2010-10-08 12:04:24 UTC (rev 3262)
+++ jcr/branches/1.14-CNK/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java	2010-10-08 12:39:11 UTC (rev 3263)
@@ -193,7 +193,10 @@
                out.close();
                out = null;
             }
-            dir.deleteFile(REDO_LOG);
+            if (dir.fileExists(REDO_LOG))
+            {
+               dir.deleteFile(REDO_LOG);
+            }
             entryCount = 0;
             return null;
          }
@@ -208,17 +211,17 @@
    private void initOut() throws IOException
    {
       SecurityHelper.doPriviledgedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+      {
+         public Object run() throws Exception
          {
-            public Object run() throws Exception
-            {
             if (out == null)
             {
                OutputStream os = new IndexOutputStream(dir.createOutput(REDO_LOG));
                out = new BufferedWriter(new OutputStreamWriter(os));
             }
-               return null;
-            }
-         });
+            return null;
+         }
+      });
    }
 
    /**
@@ -231,9 +234,9 @@
    private void read(final ActionCollector collector) throws IOException
    {
       SecurityHelper.doPriviledgedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+      {
+         public Object run() throws Exception
          {
-            public Object run() throws Exception
-            {
             if (!dir.fileExists(REDO_LOG))
             {
                return null;
@@ -269,9 +272,9 @@
                   }
                }
             }
-               return null;
-            }
-         });
+            return null;
+         }
+      });
    }
 
    //-----------------------< internal >---------------------------------------



More information about the exo-jcr-commits mailing list