Author: nzamosenchuk
Date: 2011-03-30 10:08:40 -0400 (Wed, 30 Mar 2011)
New Revision: 4190
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/config/QueryHandlerParams.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecovery.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchIndexConfigurationHelper.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
Log:
EXOJCR-577: Initial implementation of asynchronous reindexing.
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/config/QueryHandlerParams.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/config/QueryHandlerParams.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/config/QueryHandlerParams.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -114,4 +114,7 @@
public static final String PARAM_RDBMS_REINDEXING = "rdbms-reindexing";
public static final String PARAM_INDEX_RECOVERY_MODE =
"index-recovery-mode";
+
+ public static final String PARAM_ASYNC_REINDEXING = "async-reindexing";
+
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecovery.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecovery.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecovery.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -66,4 +66,12 @@
* if any exception occurred
*/
public InputStream getIndexFile(String filePath) throws RepositoryException;
+
+ /**
+ * Check if index is ready and can be retrieved.
+ *
+ * @return
+ * @throws RepositoryException if error occurs.
+ */
+ public boolean checkIndexReady() throws RepositoryException;
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexRecoveryImpl.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -80,6 +80,11 @@
private RemoteCommand changeIndexMode;
/**
+ * Remote command to check if index can be retrieved.
+ */
+ private RemoteCommand checkIndexReady;
+
+ /**
* Remote command to check if node responsible for set index online leave the cluster.
*/
private RemoteCommand requestForResponsibleToSetIndexOnline;
@@ -201,6 +206,20 @@
}
});
+ checkIndexReady = rpcService.registerCommand(new RemoteCommand()
+ {
+ public String getId()
+ {
+ return
"org.exoplatform.services.jcr.impl.core.query.IndexRecoveryImpl-checkIndexIsReady-"
+ commandSuffix;
+ }
+
+ public Serializable execute(Serializable[] args) throws Throwable
+ {
+ // if index is currently online, then it can be retrieved
+ return new Boolean(searchManager.isOnline());
+ }
+ });
+
rpcService.registerTopologyChangeListener(this);
}
@@ -281,6 +300,25 @@
throw new RepositoryException(e);
}
}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.IndexRecovery#checkIndexReady()
+ */
+ public boolean checkIndexReady() throws RepositoryException
+ {
+ try
+ {
+ return (Boolean)rpcService.executeCommandOnCoordinator(changeIndexMode, true,
true);
+ }
+ catch (SecurityException e)
+ {
+ throw new RepositoryException(e);
+ }
+ catch (RPCException e)
+ {
+ throw new RepositoryException(e);
+ }
+ }
/**
* Allows to read data from remote machine.
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -87,7 +87,7 @@
* @return a {@link ChangesHolder} instance that contains all the changes
*/
ChangesHolder getChanges(Iterator<String> remove, Iterator<NodeData>
add);
-
+
/**
* Applies the given changes to the indes in an atomic operation
* @param changes the changes to apply
@@ -95,7 +95,7 @@
* @throws IOException if an error occurs while updating the index.
*/
void apply(ChangesHolder changes) throws RepositoryException, IOException;
-
+
/**
* Closes this <code>QueryHandler</code> and frees resources attached
* to this handler.
@@ -179,7 +179,7 @@
* @param indexInfos
*/
void setIndexInfos(IndexInfos indexInfos);
-
+
/**
* Returns {@link IndexInfos} instance that was set into QueryHandler.
* @return
@@ -204,4 +204,12 @@
*/
void setOnline(boolean isOnline) throws IOException;
+ /**
+ * Offline mode means that new indexing data is collected but index is guaranteed to
be unmodified during
+ * offline state.
+ *
+ * @return the state of index.
+ */
+ boolean isOnline();
+
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchIndexConfigurationHelper.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchIndexConfigurationHelper.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchIndexConfigurationHelper.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -198,5 +198,9 @@
{
searchIndex.setIndexRecoveryMode(value);
}
+ else if (QueryHandlerParams.PARAM_ASYNC_REINDEXING.equals(name))
+ {
+ searchIndex.setAsyncReindexing(Boolean.parseBoolean(value));
+ }
}
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -56,6 +56,8 @@
import org.exoplatform.services.jcr.impl.core.SessionImpl;
import org.exoplatform.services.jcr.impl.core.query.lucene.ChangesHolder;
import org.exoplatform.services.jcr.impl.core.query.lucene.FieldNames;
+import org.exoplatform.services.jcr.impl.core.query.lucene.IndexOfflineIOException;
+import
org.exoplatform.services.jcr.impl.core.query.lucene.IndexOfflineRepositoryException;
import org.exoplatform.services.jcr.impl.core.query.lucene.LuceneVirtualTableResolver;
import org.exoplatform.services.jcr.impl.core.query.lucene.QueryHits;
import org.exoplatform.services.jcr.impl.core.query.lucene.ScoreNode;
@@ -922,6 +924,10 @@
result.add(sn.getNodeId());
}
}
+ catch (IndexOfflineIOException e)
+ {
+ throw new IndexOfflineRepositoryException(e.getMessage(), e);
+ }
catch (IOException e)
{
throw new RepositoryException(e.getLocalizedMessage(), e);
@@ -1041,6 +1047,11 @@
handler.setOnline(isOnline);
}
+ public boolean isOnline()
+ {
+ return handler.isOnline();
+ }
+
/**
* {@inheritDoc}
*/
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -441,6 +441,7 @@
void createInitialIndex(ItemDataConsumer stateMgr) throws IOException
{
// only do an initial index if there are no indexes at all
+ boolean indexCreated = false;
if (indexNames.size() == 0)
{
setOnline(false);
@@ -453,12 +454,21 @@
&& handler.getContext().getRPCService().isCoordinator() == false)
{
log.info("Retrieving index from coordinator...");
- recoveryIndexFromCoordinator();
+ indexCreated = recoveryIndexFromCoordinator();
- indexNames.read();
- refreshIndexList();
+ if (indexCreated)
+ {
+ indexNames.read();
+ refreshIndexList();
+ }
+ else
+ {
+ log
+ .info("Index can'b be retrieved from coordinator now,
because it is offline. Possibly coordinator node performs reindexing now.");
+ }
}
- else
+
+ if (!indexCreated)
{
if
(handler.getIndexRecoveryMode().equals(SearchIndex.INDEX_RECOVERY_MODE_FROM_COORDINATOR))
{
@@ -3312,6 +3322,20 @@
}
}
+ /**
+ * @return true if index is online. It means that there is no background indexing or
index retrieval jobs
+ */
+ public boolean isOnline()
+ {
+ return online;
+ }
+
+ /**
+ * Switches index mode
+ *
+ * @param isOnline
+ * @throws IOException
+ */
public synchronized void setOnline(boolean isOnline) throws IOException
{
// if mode really changed
@@ -3337,6 +3361,10 @@
flush();
}
}
+ else if (!online)
+ {
+ throw new IOException("Index is already in OFFLINE mode.");
+ }
}
/**
@@ -3568,12 +3596,17 @@
* @throws RepositoryException.
* @throws FileNotFoundException.
*/
- private void recoveryIndexFromCoordinator() throws FileNotFoundException,
RepositoryException, IOException,
+ private boolean recoveryIndexFromCoordinator() throws FileNotFoundException,
RepositoryException, IOException,
SuspendException
{
try
{
IndexRecovery indexRecovery = handler.getContext().getIndexRecovery();
+ // check if index not ready
+ if (!indexRecovery.checkIndexReady())
+ {
+ return false;
+ }
indexRecovery.setIndexOffline();
File indexDirectory = new File(handler.getContext().getIndexDirectory());
@@ -3617,6 +3650,7 @@
finally
{
}
+ return true;
}
/**
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/OfflinePersistentIndex.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -73,6 +73,7 @@
{
int count = super.removeDocument(idTerm);
processedIDs.add(idTerm.text());
+ System.out.println("RM: " + idTerm.text());
return count;
}
@@ -82,6 +83,7 @@
super.addDocuments(docs);
for (Document doc : docs)
{
+ System.out.println("add: " + doc.get(FieldNames.UUID));
processedIDs.add(doc.get(FieldNames.UUID));
}
}
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -183,6 +183,11 @@
*/
public static final boolean DEFAULT_RDBMS_REINDEXING = true;
+ /**
+ * The default value for {@link #asyncReindexing}.
+ */
+ public static final boolean DEFAULT_ASYNC_REINDEXING = false;
+
/**
* The default value for {@link #indexRecoveryMode}.
*/
@@ -494,6 +499,11 @@
private String indexRecoveryMode = INDEX_RECOVERY_MODE_FROM_INDEXING;
/**
+ * Defines reindexing synchronization policy. Whether or not start it asynchronously
+ */
+ private boolean asyncReindexing = DEFAULT_ASYNC_REINDEXING;
+
+ /**
* Working constructor.
*
* @throws RepositoryConfigurationException
@@ -635,39 +645,33 @@
// if RW mode, create initial index and start check
if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
{
- if (index.numDocs() == 0 && context.isCreateInitialIndex())
+ final boolean doReindexing = (index.numDocs() == 0 &&
context.isCreateInitialIndex());
+ final boolean doCheck = (consistencyCheckEnabled &&
(index.getRedoLogApplied() || forceConsistencyCheck));
+ final ItemDataConsumer itemStateManager = context.getItemStateManager();
+
+ if (isAsyncReindexing() && doReindexing)
{
- index.createInitialIndex(context.getItemStateManager());
- }
- if (consistencyCheckEnabled && (index.getRedoLogApplied() ||
forceConsistencyCheck))
- {
- log.info("Running consistency check...");
- try
+ log.info("Launching reindexing in asynchronous mode.");
+ new Thread(new Runnable()
{
- ConsistencyCheck check = ConsistencyCheck.run(index,
context.getItemStateManager());
- if (autoRepair)
+ public void run()
{
- check.repair(true);
- }
- else
- {
- List<ConsistencyCheckError> errors = check.getErrors();
- if (errors.size() == 0)
+ try
{
- log.info("No errors detected.");
+ reindex(doReindexing, doCheck, itemStateManager);
}
- for (Iterator<ConsistencyCheckError> it = errors.iterator();
it.hasNext();)
+ catch (IOException e)
{
- ConsistencyCheckError err = it.next();
- log.info(err.toString());
+ log
+ .error("Error while reindexing the workspace. Please fix the
problem, delete index and restart server.");
}
}
- }
- catch (Exception e)
- {
- log.warn("Failed to run consistency check on index: " + e);
- }
+ }, "Reindexing-" +
context.getContainer().getWorkspaceName()).start();
}
+ else
+ {
+ reindex(doReindexing, doCheck, itemStateManager);
+ }
}
// initialize spell checker
spellChecker = createSpellChecker();
@@ -690,6 +694,43 @@
modeHandler.addIndexerIoModeListener(this);
}
+ private void reindex(boolean doReindexing, boolean doCheck, ItemDataConsumer
itemStateManager) throws IOException
+ {
+ if (doReindexing)
+ {
+ index.createInitialIndex(itemStateManager);
+ }
+ if (doCheck)
+ {
+ log.info("Running consistency check...");
+ try
+ {
+ ConsistencyCheck check = ConsistencyCheck.run(index, itemStateManager);
+ if (autoRepair)
+ {
+ check.repair(true);
+ }
+ else
+ {
+ List<ConsistencyCheckError> errors = check.getErrors();
+ if (errors.size() == 0)
+ {
+ log.info("No errors detected.");
+ }
+ for (Iterator<ConsistencyCheckError> it = errors.iterator();
it.hasNext();)
+ {
+ ConsistencyCheckError err = it.next();
+ log.info(err.toString());
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to run consistency check on index: " + e);
+ }
+ }
+ }
+
/**
* @return the errorLogfileSize
*/
@@ -1217,6 +1258,12 @@
*/
protected IndexReader getIndexReader(boolean includeSystemIndex) throws IOException
{
+ // deny query execution if index in offline mode
+ // TODO Replace with special Exception Type
+ if (!index.isOnline())
+ {
+ throw new IndexOfflineIOException("Index is offline");
+ }
QueryHandler parentHandler = getContext().getParentHandler();
CachingMultiIndexReader parentReader = null;
if (parentHandler instanceof SearchIndex && includeSystemIndex)
@@ -2731,6 +2778,14 @@
return rdbmsReindexing;
}
+ /**
+ * @return the current value for asyncReindexing
+ */
+ public boolean isAsyncReindexing()
+ {
+ return asyncReindexing;
+ }
+
/**
* @return the current value for indexRecoveryMode
*/
@@ -2804,6 +2859,17 @@
this.indexRecoveryMode = indexRecoveryMode;
}
+ /**
+ * Set a new value for asyncReindexing.
+ *
+ * @param indexRecoveryMode
+ * the new value for asyncReindexing
+ */
+ public void setAsyncReindexing(boolean asyncReindexing)
+ {
+ this.asyncReindexing = asyncReindexing;
+ }
+
// ----------------------------< internal
// >----------------------------------
@@ -2970,6 +3036,14 @@
checkOpen();
index.setOnline(isOnline);
}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#isOnline()
+ */
+ public boolean isOnline()
+ {
+ return index.isOnline();
+ }
/**
* {@inheritDoc}
Modified:
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
===================================================================
---
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-03-29
14:53:18 UTC (rev 4189)
+++
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-03-30
14:08:40 UTC (rev 4190)
@@ -115,4 +115,12 @@
// TODO Auto-generated method stub
}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#isOnline()
+ */
+ public boolean isOnline()
+ {
+ return true;
+ }
}