[exo-jcr-commits] exo-jcr SVN: r4009 - in jcr/trunk/exo.jcr.component.core/src: main/java/org/exoplatform/services/jcr/impl/core/query/lucene and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Feb 24 02:48:09 EST 2011


Author: nzamosenchuk
Date: 2011-02-24 02:48:07 -0500 (Thu, 24 Feb 2011)
New Revision: 4009

Added:
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java
Modified:
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandlerContext.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
   jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
   jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
Log:
EXOJCR-1194 :  Introducing OfflinePersistendIndex that collects index for content updated while index was offline.
EXOJCR-1193 :  Invoking OfflinePersistendIndex when index retrieval in progress.

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -18,6 +18,7 @@
 
 import org.exoplatform.commons.utils.PrivilegedFileHelper;
 import org.exoplatform.services.jcr.config.RepositoryConfigurationException;
+import org.exoplatform.services.jcr.impl.core.query.lucene.OfflinePersistentIndex;
 import org.exoplatform.services.jcr.impl.util.io.DirectoryHelper;
 import org.exoplatform.services.log.ExoLogger;
 import org.exoplatform.services.log.Log;
@@ -89,6 +90,13 @@
    protected Boolean isResponsibleToSetIndexOnline = false;
 
    /**
+    * Indicates whether current node is in online or offline mode
+    */
+   protected Boolean isOnline = true;
+
+   protected final SearchManager searchManager;
+
+   /**
     * Constructor IndexRetrieveImpl.
     * 
     * @throws RepositoryConfigurationException 
@@ -97,6 +105,7 @@
       throws RepositoryConfigurationException
    {
       this.rpcService = rpcService;
+      this.searchManager = searchManager;
 
       final String commandSuffix = searchManager.getWsId() + "-" + (searchManager.parentSearchManager == null);
       final File indexDirectory = searchManager.getIndexDirectory();
@@ -111,9 +120,8 @@
          public Serializable execute(Serializable[] args) throws Throwable
          {
             boolean isOnline = (Boolean)args[0];
-
-            // TODO searchManager.setReadOnly(isReadOnly);
-
+            searchManager.setOnline(isOnline);
+            IndexRecoveryImpl.this.isOnline = isOnline;
             return null;
          }
       });
@@ -134,10 +142,14 @@
             {
                if (!file.isDirectory())
                {
-                  result.add(PrivilegedFileHelper.getAbsolutePath(file).substring(indexDirLen));
+                  // if parent directory is not "offline" then add this file. Otherwise skip it.
+                  // TODO implement list retrieval via index state manager
+                  if (!file.getParent().endsWith(OfflinePersistentIndex.NAME))
+                  {
+                     result.add(PrivilegedFileHelper.getAbsolutePath(file).substring(indexDirLen));
+                  }
                }
             }
-
             return result;
          }
       });
@@ -156,10 +168,10 @@
 
             RandomAccessFile file = new RandomAccessFile(new File(indexDirectory, filePath), "r");
             file.seek(offset);
-            
+
             byte[] buffer = new byte[BUFFER_SIZE];
             int len = file.read(buffer);
-            
+
             if (len == -1)
             {
                return null;
@@ -373,7 +385,7 @@
    {
       try
       {
-         if (rpcService.isCoordinator())
+         if (rpcService.isCoordinator() && !isOnline)
          {
             new Thread()
             {
@@ -387,14 +399,22 @@
 
                      for (Object result : results)
                      {
-                        if ((Boolean)result)
+                        if (result instanceof Boolean)
                         {
-                           return;
+                           if ((Boolean)result)
+                           {
+                              return;
+                           }
                         }
+                        else
+                        {
+                           log.error("Result is not an instance of Boolean" + result);
+                        }
                      }
-
                      // node which was responsible for resuming leave the cluster, so resume component
-                     // TODO searchManager.setOnline();
+                     log
+                        .error("Node responsible for setting index back online seems to leave the cluster. Setting back online.");
+                     searchManager.setOnline(true);
                   }
                   catch (SecurityException e1)
                   {
@@ -404,6 +424,10 @@
                   {
                      log.error("Exception during command execution", e1);
                   }
+                  catch (IOException e2)
+                  {
+                     log.error("Exception during setting index back online", e2);
+                  }
                }
             }.start();
          }

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -196,4 +196,12 @@
     */
    void setIndexUpdateMonitor(IndexUpdateMonitor indexUpdateMonitor);
 
+   /**
+    * Switches index into corresponding ONLINE or OFFLINE mode. Offline mode means that new indexing data is
+    * collected but index is guaranteed to be unmodified during offline state.
+    * 
+    * @param isOnline
+    */
+   void setOnline(boolean isOnline) throws IOException;
+
 }

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandlerContext.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandlerContext.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandlerContext.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -23,6 +23,7 @@
 import org.exoplatform.services.jcr.impl.core.NamespaceRegistryImpl;
 import org.exoplatform.services.jcr.impl.core.nodetype.NodeTypeDataManagerImpl;
 import org.exoplatform.services.jcr.impl.core.query.lucene.LuceneVirtualTableResolver;
+import org.exoplatform.services.rpc.RPCService;
 
 /**
  * Acts as an argument for the {@link QueryHandler} to keep the interface
@@ -82,8 +83,13 @@
     * The class responsible for index retrieving from other place. 
     */
    private final IndexRecovery indexRecovery;
-   
+
    /**
+    * Field containing RPCService, if any configured in container  
+    */
+   private final RPCService rpcService;
+
+   /**
     * Creates a new context instance.
     * 
     * @param fs
@@ -107,11 +113,13 @@
     *            descendant of that node is also excluded from indexing.
     * @param indexRecovery
     *            the index retriever from other place     
+    * @param rpcService
+    *            RPCService intance if any
     */
    public QueryHandlerContext(WorkspaceContainerFacade container, ItemDataConsumer stateMgr, IndexingTree indexingTree,
       NodeTypeDataManager nodeTypeDataManager, NamespaceRegistryImpl nsRegistry, QueryHandler parentHandler,
       String indexDirectory, DocumentReaderService extractor, boolean createInitialIndex,
-      LuceneVirtualTableResolver virtualTableResolver, IndexRecovery indexRecovery)
+      LuceneVirtualTableResolver virtualTableResolver, IndexRecovery indexRecovery, RPCService rpcService)
    {
       this.indexRecovery = indexRecovery;
       this.container = container;
@@ -124,6 +132,7 @@
       this.createInitialIndex = createInitialIndex;
       this.virtualTableResolver = virtualTableResolver;
       this.propRegistry = new PropertyTypeRegistry(nodeTypeDataManager);
+      this.rpcService = rpcService;
       this.parentHandler = parentHandler;
       ((NodeTypeDataManagerImpl)this.nodeTypeDataManager).addListener(propRegistry);
    }
@@ -237,4 +246,12 @@
       return indexRecovery;
    }
 
+   /**
+    * @return RPCService if any present in a container.
+    */
+   public RPCService getRPCService()
+   {
+      return rpcService;
+   }
+
 }

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -382,7 +382,9 @@
             try
             {
                if (reader != null)
+               {
                   reader.close();
+               }
             }
             catch (IOException e)
             {
@@ -495,7 +497,9 @@
    {
 
       if (log.isDebugEnabled())
+      {
          log.debug("start");
+      }
       try
       {
          if (indexingTree == null)
@@ -517,7 +521,9 @@
                   {
                      ItemData excludeData = itemMgr.getItemData(stringTokenizer.nextToken());
                      if (excludeData != null)
+                     {
                         excludedPath.add(excludeData.getQPath());
+                     }
                   }
                   catch (RepositoryException e)
                   {
@@ -534,7 +540,9 @@
                {
                   ItemData indexingRootDataItem = itemMgr.getItemData(rootNodeIdentifer);
                   if (indexingRootDataItem != null && indexingRootDataItem.isNode())
+                  {
                      indexingRootData = (NodeData)indexingRootDataItem;
+                  }
                }
                catch (RepositoryException e)
                {
@@ -628,14 +636,20 @@
                         if (item.isNode())
                         {
                            if (!indexingTree.isExcluded(item))
+                           {
                               return (NodeData)item;
+                           }
                         }
                         else
+                        {
                            log.warn("Node not found, but property " + id + ", " + item.getQPath().getAsString()
                               + " found. ");
+                        }
                      }
                      else
+                     {
                         log.warn("Unable to index node with id " + id + ", node does not exist.");
+                     }
 
                   }
                   catch (RepositoryException e)
@@ -704,7 +718,7 @@
       QueryHandlerContext context =
          new QueryHandlerContext(container, itemMgr, indexingTree, nodeTypeDataManager, nsReg, parentHandler,
             PrivilegedFileHelper.getAbsolutePath(getIndexDirectory()), extractor, true, virtualTableResolver,
-            indexRecovery);
+            indexRecovery, rpcService);
 
       return context;
    }
@@ -792,8 +806,8 @@
          if (parentSearchManager != null)
          {
             newChangesFilter =
-               constuctor.newInstance(this, parentSearchManager, config, indexingTree,
-                  parentSearchManager.getIndexingTree(), handler, parentSearchManager.getHandler(), cfm);
+               constuctor.newInstance(this, parentSearchManager, config, indexingTree, parentSearchManager
+                  .getIndexingTree(), handler, parentSearchManager.getHandler(), cfm);
          }
       }
       catch (SecurityException e)
@@ -840,7 +854,9 @@
       // initialize query handler
       String className = config.getType();
       if (className == null)
+      {
          throw new RepositoryConfigurationException("Content hanler       configuration fail");
+      }
 
       try
       {
@@ -1015,6 +1031,17 @@
    }
 
    /**
+    * Switches index into online or offline modes.
+    * 
+    * @param isOnline
+    * @throws IOException
+    */
+   public void setOnline(boolean isOnline) throws IOException
+   {
+      handler.setOnline(isOnline);
+   }
+
+   /**
     * {@inheritDoc}
     */
    public void resume() throws ResumeException

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -148,6 +148,8 @@
     */
    private VolatileIndex volatileIndex;
 
+   private OfflinePersistentIndex offlineIndex;
+
    /**
     * Flag indicating whether an update operation is in progress.
     */
@@ -230,8 +232,10 @@
 
    /**
     * Flag indicating whether re-indexing is running.
+    * Or for any other reason it should be switched
+    * to offline mode.
     */
-   private boolean reindexing = false;
+   private boolean online = true;
 
    /**
     * Flag indicating whether the index is stopped.
@@ -319,6 +323,10 @@
          merger.indexAdded(index.getName(), index.getNumDocuments());
       }
 
+      offlineIndex =
+         new OfflinePersistentIndex(handler.getTextAnalyzer(), handler.getSimilarity(), cache, indexingQueue,
+            directoryManager);
+
       // this method is run in privileged mode internally
       IndexingQueueStore store = new IndexingQueueStore(indexDir);
 
@@ -433,11 +441,14 @@
       // only do an initial index if there are no indexes at all
       if (indexNames.size() == 0)
       {
-         reindexing = true;
+         setOnline(false);
+
          try
          {
+            // if "from-coordinator" used along with RPC Service present and 
             if (handler.getIndexRecoveryMode().equals(SearchIndex.INDEX_RECOVERY_MODE_FROM_COORDINATOR)
-               && handler.getContext().getIndexRecovery() != null)
+               && handler.getContext().getIndexRecovery() != null && handler.getContext().getRPCService() != null
+               && handler.getContext().getRPCService().isCoordinator() == false)
             {
                log.info("Retrieving index from coordinator...");
                recoveryIndexFromCoordinator();
@@ -447,6 +458,20 @@
             }
             else
             {
+               if (handler.getIndexRecoveryMode().equals(SearchIndex.INDEX_RECOVERY_MODE_FROM_COORDINATOR))
+               {
+                  if (handler.getContext().getRPCService() == null)
+                  {
+                     log
+                        .error("RPC Service is not configured but required for copying the index from coordinator node. Index will be created by re-indexing.");
+                  }
+                  else if (handler.getContext().getRPCService().isCoordinator() == true)
+                  {
+                     log
+                        .info("Copying the index from coordinator configured, but this node is the only one in a cluster. Index will be created by re-indexing.");
+                  }
+               }
+
                // traverse and index workspace
                executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
 
@@ -484,7 +509,7 @@
          }
          finally
          {
-            reindexing = false;
+            setOnline(true);
          }
       }
       else
@@ -509,8 +534,12 @@
     */
    synchronized void update(final Collection remove, final Collection add) throws IOException
    {
-      if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+      if (!online)
       {
+         doUpdateOffline(remove, add);
+      }
+      else if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+      {
          doUpdateRW(remove, add);
       }
       else
@@ -520,7 +549,8 @@
    }
 
    /**
-    * For investigation purposes only
+    * Performs indexing into volatile index in case of Read_Only mode. This ensures that node
+    * was not present in latest persistent index in case of coordinator has just committed the index
     * 
     * @param remove
     * @param add
@@ -716,7 +746,72 @@
       });
    }
 
+   private void invokeOfflineIndex() throws IOException
+   {
+      List<String> processedIDs = offlineIndex.getProcessedIDs();
+      // remove all nodes placed in offline index
+      update(processedIDs, Collections.EMPTY_LIST);
+
+      executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
+      // create index
+      CreateIndex create = new CreateIndex(getTransactionId(), null);
+      executeAndLog(create);
+
+      // invoke offline (copy offline into working index)
+      executeAndLog(new OfflineInvoke(getTransactionId(), create.getIndexName()));
+
+      // add new index
+      AddIndex add = new AddIndex(getTransactionId(), create.getIndexName());
+      executeAndLog(add);
+
+      executeAndLog(new Commit(getTransactionId()));
+
+      indexNames.write();
+
+      offlineIndex.close();
+      deleteIndex(offlineIndex);
+      offlineIndex = null;
+   }
+
    /**
+    * Performs indexing while re-indexing is in progress
+    * 
+    * @param remove
+    * @param add
+    * @throws IOException
+    */
+   private void doUpdateOffline(final Collection remove, final Collection add) throws IOException
+   {
+      SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+      {
+         public Object run() throws Exception
+         {
+            for (Iterator it = remove.iterator(); it.hasNext();)
+            {
+               Term idTerm = new Term(FieldNames.UUID, (String)it.next());
+               offlineIndex.removeDocument(idTerm);
+            }
+
+            for (Iterator it = add.iterator(); it.hasNext();)
+            {
+               Document doc = (Document)it.next();
+               if (doc != null)
+               {
+                  offlineIndex.addDocuments(new Document[]{doc});
+                  // reset volatile index if needed
+                  if (offlineIndex.getRamSizeInBytes() >= handler.getMaxVolatileIndexSize())
+                  {
+                     offlineIndex.commit();
+                  }
+               }
+            }
+            return null;
+         }
+      });
+   }
+
+   /**
     * Adds a document to the index.
     * 
     * @param doc
@@ -994,7 +1089,7 @@
          try
          {
             // if we are reindexing there is already an active transaction
-            if (!reindexing)
+            if (online)
             {
                executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
             }
@@ -1025,7 +1120,7 @@
             }
             index.commit();
 
-            if (!reindexing)
+            if (online)
             {
                // only commit if we are not reindexing
                // when reindexing the final commit is done at the very end
@@ -1050,7 +1145,7 @@
             }
          }
       }
-      if (reindexing)
+      if (!online)
       {
          // do some cleanup right away when reindexing
          attemptDelete();
@@ -1154,6 +1249,11 @@
       return volatileIndex;
    }
 
+   OfflinePersistentIndex getOfflinePersistentIndex()
+   {
+      return offlineIndex;
+   }
+
    /**
     * Closes this <code>MultiIndex</code>.
     */
@@ -2040,11 +2140,21 @@
       static final String VOLATILE_COMMIT = "VOL_COM";
 
       /**
+       * Action identifier in redo log for offline index invocation action.
+       */
+      static final String OFFLINE_INVOKE = "OFF_INV";
+
+      /**
        * Action type for volatile index commit action.
        */
       public static final int TYPE_VOLATILE_COMMIT = 4;
 
       /**
+       * Action type for volatile index commit action.
+       */
+      public static final int TYPE_OFFLINE_INVOKE = 8;
+
+      /**
        * Action identifier in redo log for index create action.
        */
       static final String CREATE_INDEX = "CRE_IDX";
@@ -2233,6 +2343,10 @@
          {
             a = VolatileCommit.fromString(transactionId, arguments);
          }
+         else if (actionLabel.equals(Action.OFFLINE_INVOKE))
+         {
+            a = OfflineInvoke.fromString(transactionId, arguments);
+         }
          else
          {
             throw new IllegalArgumentException(line);
@@ -2326,8 +2440,8 @@
       /**
        * The maximum length of a AddNode String.
        */
-      private static final int ENTRY_LENGTH = Long.toString(Long.MAX_VALUE).length() + Action.ADD_NODE.length()
-         + Constants.UUID_FORMATTED_LENGTH + 2;
+      private static final int ENTRY_LENGTH =
+         Long.toString(Long.MAX_VALUE).length() + Action.ADD_NODE.length() + Constants.UUID_FORMATTED_LENGTH + 2;
 
       /**
        * The uuid of the node to add.
@@ -2728,8 +2842,8 @@
       /**
        * The maximum length of a DeleteNode String.
        */
-      private static final int ENTRY_LENGTH = Long.toString(Long.MAX_VALUE).length() + Action.DELETE_NODE.length()
-         + Constants.UUID_FORMATTED_LENGTH + 2;
+      private static final int ENTRY_LENGTH =
+         Long.toString(Long.MAX_VALUE).length() + Action.DELETE_NODE.length() + Constants.UUID_FORMATTED_LENGTH + 2;
 
       /**
        * The uuid of the node to remove.
@@ -2944,6 +3058,69 @@
       }
    }
 
+   private static class OfflineInvoke extends Action
+   {
+
+      /**
+       * The name of the target index to commit to.
+       */
+      private final String targetIndex;
+
+      /**
+       * Creates a new VolatileCommit action.
+       * 
+       * @param transactionId
+       *            the id of the transaction that executes this action.
+       */
+      OfflineInvoke(long transactionId, String targetIndex)
+      {
+         super(transactionId, Action.TYPE_OFFLINE_INVOKE);
+         this.targetIndex = targetIndex;
+      }
+
+      /**
+       * Creates a new VolatileCommit action.
+       * 
+       * @param transactionId
+       *            the id of the transaction that executes this action.
+       * @param arguments
+       *            ignored by this implementation.
+       * @return the VolatileCommit action.
+       */
+      static OfflineInvoke fromString(long transactionId, String arguments)
+      {
+         return new OfflineInvoke(transactionId, arguments);
+      }
+
+      /**
+       * Commits the volatile index to disk.
+       * 
+       * @inheritDoc
+       */
+      @Override
+      public void execute(MultiIndex index) throws IOException
+      {
+         OfflinePersistentIndex offlineIndex = index.getOfflinePersistentIndex();
+         PersistentIndex persistentIndex = index.getOrCreateIndex(targetIndex);
+         persistentIndex.copyIndex(offlineIndex);
+      }
+
+      /**
+       * @inheritDoc
+       */
+      @Override
+      public String toString()
+      {
+         StringBuffer logLine = new StringBuffer();
+         logLine.append(Long.toString(getTransactionId()));
+         logLine.append(' ');
+         logLine.append(Action.OFFLINE_INVOKE);
+         logLine.append(' ');
+         logLine.append(targetIndex);
+         return logLine.toString();
+      }
+   }
+
    /**
     * @see org.exoplatform.services.jcr.impl.core.query.IndexerIoModeListener#onChangeMode(org.exoplatform.services.jcr.impl.core.query.IndexerIoMode)
     */
@@ -3111,12 +3288,39 @@
             }
             catch (IOException e)
             {
-               log.error("An erro occurs while trying to wake up the sleeping threads", e);
+               log.error("An error occurred while trying to wake up the sleeping threads", e);
             }
          }
       }
    }
 
+   public synchronized void setOnline(boolean isOnline) throws IOException
+   {
+      // if mode really changed
+      if (online != isOnline)
+      {
+         // switching to ONLINE
+         if (isOnline)
+         {
+            log.info("Setting index back online");
+            offlineIndex.commit(true);
+            //invoking offline index
+            invokeOfflineIndex();
+            online = true;
+         }
+         // switching to OFFLINE
+         else
+         {
+            log.info("Setting index offline");
+            offlineIndex =
+               new OfflinePersistentIndex(handler.getTextAnalyzer(), handler.getSimilarity(), cache, indexingQueue,
+                  directoryManager);
+            online = false;
+            flush();
+         }
+      }
+   }
+
    /**
     * This class is used to index a node and its descendants nodes with several threads 
     */

Added: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java	                        (rev 0)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -0,0 +1,106 @@
+/*
+ * Copyright (C) 2011 eXo Platform SAS.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.exoplatform.services.jcr.impl.core.query.lucene;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Similarity;
+import org.exoplatform.services.jcr.impl.core.query.lucene.directory.DirectoryManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Class extends {@link PersistentIndex} and designed to be used while Index is not yet started
+ * due to long running jobs, but since it is launched in clustered environment some concurrent 
+ * repository operations can be performed (add, delete).   
+ * 
+ * @author <a href="mailto:nikolazius at gmail.com">Nikolay Zamosenchuk</a>
+ * @version $Id: OfflinePersistentIndex.java 34360 2009-07-22 23:58:59Z nzamosenchuk $
+ *
+ */
+public class OfflinePersistentIndex extends PersistentIndex
+{
+   public static final String NAME = "offline";
+
+   private List<String> processedIDs;
+
+   /**
+    * Creates a new <code>OfflinePersistentIndex</code>.
+    *
+    * @param analyzer the analyzer for text tokenizing.
+    * @param similarity the similarity implementation.
+    * @param cache the document number cache
+    * @param indexingQueue the indexing queue.
+    * @param directoryManager the directory manager.
+    * @throws IOException if an error occurs while opening / creating the
+    *  index.
+    */
+   OfflinePersistentIndex(Analyzer analyzer, Similarity similarity, DocNumberCache cache, IndexingQueue indexingQueue,
+      DirectoryManager directoryManager) throws IOException
+   {
+      super(NAME, analyzer, similarity, cache, indexingQueue, directoryManager);
+      this.processedIDs = new ArrayList<String>();
+   }
+
+   @Override
+   int getNumDocuments() throws IOException
+   {
+      return super.getNumDocuments();
+   }
+
+   @Override
+   int removeDocument(Term idTerm) throws IOException
+   {
+      int count = super.removeDocument(idTerm);
+      processedIDs.add(idTerm.text());
+      System.out.println("RM: " + idTerm.text());
+      return count;
+   }
+
+   @Override
+   void addDocuments(Document[] docs) throws IOException
+   {
+      super.addDocuments(docs);
+      for (Document doc : docs)
+      {
+         System.out.println("add: " + doc.get(FieldNames.UUID));
+         processedIDs.add(doc.get(FieldNames.UUID));
+      }
+   }
+
+   @Override
+   synchronized void close()
+   {
+      processedIDs.clear();
+      super.close();
+   }
+
+   /**
+    * @return the list of UUIDs that where processed by this index. They are both added and removed nodes.
+    */
+   public List<String> getProcessedIDs()
+   {
+      return Collections.unmodifiableList(processedIDs);
+   }
+
+}

Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -762,7 +762,11 @@
    public void apply(ChangesHolder changes) throws RepositoryException, IOException
    {
       checkOpen();
-      index.update(changes.getRemove(), changes.getAdd());
+      // index may not be initialized, but some operation can be performed in cluster environment
+      if (index != null)
+      {
+         index.update(changes.getRemove(), changes.getAdd());
+      }
    }
 
    /**
@@ -2945,4 +2949,13 @@
          log.error("Can not recover error log.", e);
       }
    }
+
+   /**
+    * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean)
+    */
+   public void setOnline(boolean isOnline) throws IOException
+   {
+      checkOpen();
+      index.setOnline(isOnline);
+   }
 }

Modified: jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java	2011-02-23 12:40:22 UTC (rev 4008)
+++ jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java	2011-02-24 07:48:07 UTC (rev 4009)
@@ -106,4 +106,13 @@
       // TODO Auto-generated method stub
       return null;
    }
+
+   /**
+    * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean)
+    */
+   public void setOnline(boolean isOnline)
+   {
+      // TODO Auto-generated method stub
+      
+   }
 }



More information about the exo-jcr-commits mailing list