[exo-jcr-commits] exo-jcr SVN: r4274 - in jcr/trunk/exo.jcr.component.core/src: main/java/org/exoplatform/services/jcr/impl/core/query/ispn and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Apr 22 08:19:43 EDT 2011
Author: nzamosenchuk
Date: 2011-04-22 08:19:42 -0400 (Fri, 22 Apr 2011)
New Revision: 4274
Modified:
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/ispn/ISPNIndexChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexingQueue.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
Log:
EXOJCR-1280 : finished implementation of host JMX-based async reindexing
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/AbstractQueryHandler.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -90,6 +90,14 @@
}
/**
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#getIndexerIoModeHandler()
+ */
+ public IndexerIoModeHandler getIndexerIoModeHandler()
+ {
+ return modeHandler;
+ }
+
+ /**
* @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setContext(org.exoplatform.services.jcr.impl.core.query.QueryHandlerContext)
*/
public void setContext(QueryHandlerContext context)
@@ -256,4 +264,12 @@
{
this.indexUpdateMonitor = indexUpdateMonitor;
}
-}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean)
+ */
+ public void setOnline(boolean isOnline) throws IOException
+ {
+ setOnline(isOnline, false);
+ }
+}
\ No newline at end of file
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/IndexerChangesFilter.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -224,4 +224,17 @@
{
return true;
}
+
+ /**
+ * If index is shared across the cluster, which means that all modifications or state changes must be
+ * replicated along cluster. I.e. when hot reindexing is started, the index marked as "shared" will be
+ * set in offline mode within whole cluster. But non-shared index, will only be switched to offline
+ * locally
+ *
+ * @return
+ */
+ public boolean isShared()
+ {
+ return false;
+ }
}
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/QueryHandler.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -155,6 +155,8 @@
void setIndexerIoModeHandler(IndexerIoModeHandler handler) throws IOException;
+ IndexerIoModeHandler getIndexerIoModeHandler();
+
/**
* @return the name of the query class to use.
*/
@@ -198,13 +200,28 @@
/**
* Switches index into corresponding ONLINE or OFFLINE mode. Offline mode means that new indexing data is
- * collected but index is guaranteed to be unmodified during offline state.
+ * collected but index is guaranteed to be unmodified during offline state. Queries are denied in offline
+ * mode. Please refer to {{@link #setOnline(boolean, boolean)}} if queries must be allowed while index is
+ * offline.
*
* @param isOnline
+ * @throws IOException
*/
void setOnline(boolean isOnline) throws IOException;
/**
+ * Switches index into corresponding ONLINE or OFFLINE mode. Offline mode means that new indexing data is
+ * collected but index is guaranteed to be unmodified during offline state. Passing the allowQuery flag, can
+ * allow or deny performing queries on index during offline mode. AllowQuery is not used when setting index
+ * back online.
+ *
+ * @param isOnline
+ * @param allowQuery
+ * @throws IOException
+ */
+ void setOnline(boolean isOnline, boolean allowQuery) throws IOException;
+
+ /**
* Offline mode means that new indexing data is collected but index is guaranteed to be unmodified during
* offline state.
*
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/SearchManager.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -18,12 +18,16 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.WildcardQuery;
+import org.apache.lucene.search.BooleanClause.Occur;
import org.exoplatform.commons.utils.PrivilegedFileHelper;
import org.exoplatform.container.ExoContainerContext;
import org.exoplatform.container.configuration.ConfigurationManager;
+import org.exoplatform.management.annotations.Managed;
+import org.exoplatform.management.annotations.ManagedDescription;
+import org.exoplatform.management.jmx.annotations.NameTemplate;
+import org.exoplatform.management.jmx.annotations.Property;
import org.exoplatform.services.document.DocumentReaderService;
import org.exoplatform.services.jcr.RepositoryService;
import org.exoplatform.services.jcr.config.QueryHandlerEntry;
@@ -83,7 +87,9 @@
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -109,6 +115,8 @@
* @version $Id: SearchManager.java 1008 2009-12-11 15:14:51Z nzamosenchuk $
*/
@NonVolatile
+ at Managed
+ at NameTemplate(@Property(key = "service", value = "SearchManager"))
public class SearchManager implements Startable, MandatoryItemsPersistenceListener, Suspendable, Backupable,
TopologyChangeListener
{
@@ -118,6 +126,11 @@
*/
private static final Log log = ExoLogger.getLogger("exo.jcr.component.core.SearchManager");
+ /**
+ * Used to display date and time for JMX components
+ */
+ private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
protected final QueryHandlerEntry config;
/**
@@ -220,8 +233,15 @@
*/
private RemoteCommand requestForResponsibleForResuming;
- public SearchManager(ExoContainerContext ctx, WorkspaceEntry wEntry, RepositoryEntry rEntry, RepositoryService rService,
- QueryHandlerEntry config, NamespaceRegistryImpl nsReg, NodeTypeDataManager ntReg,
+ /**
+ * Switches index between online and offline modes
+ */
+ private RemoteCommand changeIndexState;
+
+ private String hotReindexingState = "not stated";
+
+ public SearchManager(ExoContainerContext ctx, WorkspaceEntry wEntry, RepositoryEntry rEntry,
+ RepositoryService rService, QueryHandlerEntry config, NamespaceRegistryImpl nsReg, NodeTypeDataManager ntReg,
WorkspacePersistentDataManager itemMgr, SystemSearchManagerHolder parentSearchManager,
DocumentReaderService extractor, ConfigurationManager cfm, final RepositoryIndexSearcherHolder indexSearcherHolder)
throws RepositoryException, RepositoryConfigurationException
@@ -261,8 +281,8 @@
* if the search manager cannot be initialized
* @throws RepositoryConfigurationException
*/
- public SearchManager(ExoContainerContext ctx, WorkspaceEntry wEntry, RepositoryEntry rEntry, RepositoryService rService,
- QueryHandlerEntry config, NamespaceRegistryImpl nsReg, NodeTypeDataManager ntReg,
+ public SearchManager(ExoContainerContext ctx, WorkspaceEntry wEntry, RepositoryEntry rEntry,
+ RepositoryService rService, QueryHandlerEntry config, NamespaceRegistryImpl nsReg, NodeTypeDataManager ntReg,
WorkspacePersistentDataManager itemMgr, SystemSearchManagerHolder parentSearchManager,
DocumentReaderService extractor, ConfigurationManager cfm,
final RepositoryIndexSearcherHolder indexSearcherHolder, RPCService rpcService) throws RepositoryException,
@@ -378,7 +398,7 @@
try
{
reader = ((SearchIndex)handler).getIndexReader();
- final Collection fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
+ final Collection<?> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
for (final Object field : fields)
{
fildsSet.add((String)field);
@@ -871,17 +891,18 @@
try
{
- Class qHandlerClass = Class.forName(className, true, this.getClass().getClassLoader());
+ Class<?> qHandlerClass = Class.forName(className, true, this.getClass().getClassLoader());
try
{
// We first try a constructor with the workspace id
- Constructor constuctor = qHandlerClass.getConstructor(String.class, QueryHandlerEntry.class, ConfigurationManager.class);
+ Constructor<?> constuctor =
+ qHandlerClass.getConstructor(String.class, QueryHandlerEntry.class, ConfigurationManager.class);
handler = (QueryHandler)constuctor.newInstance(wsContainerId, config, cfm);
}
catch (NoSuchMethodException e)
{
// No constructor with the workspace id can be found so we use the default constructor
- Constructor constuctor = qHandlerClass.getConstructor(QueryHandlerEntry.class, ConfigurationManager.class);
+ Constructor<?> constuctor = qHandlerClass.getConstructor(QueryHandlerEntry.class, ConfigurationManager.class);
handler = (QueryHandler)constuctor.newInstance(config, cfm);
}
QueryHandler parentHandler = (this.parentSearchManager != null) ? parentSearchManager.getHandler() : null;
@@ -1071,9 +1092,23 @@
*/
public void setOnline(boolean isOnline) throws IOException
{
- handler.setOnline(isOnline);
+ // deny queries
+ handler.setOnline(isOnline, false);
}
+ /**
+ * Switches index into online or offline modes. Passing the allowQuery flag, can
+ * allow or deny performing queries on index during offline mode
+ *
+ * @param isOnline
+ * @param allowQuery
+ * @throws IOException
+ */
+ public void setOnline(boolean isOnline, boolean allowQuery) throws IOException
+ {
+ handler.setOnline(isOnline, allowQuery);
+ }
+
public boolean isOnline()
{
return handler.isOnline();
@@ -1108,6 +1143,186 @@
}
/**
+ * Public method, designed to be called via JMX, to perform "HOT" reindexing of the workspace
+ *
+ * @throws IOException
+ * @throws IllegalStateException
+ */
+ @Managed
+ @ManagedDescription("Starts hot async reindexing")
+ public void reindex(final boolean dropExisting) throws IllegalStateException
+ {
+ // checks
+ if (handler == null || handler.getIndexerIoModeHandler() == null || changesFilter == null)
+ {
+ throw new IllegalStateException("Index might have not been initialized yet.");
+ }
+ if (handler.getIndexerIoModeHandler().getMode() != IndexerIoMode.READ_WRITE)
+ {
+ throw new IllegalStateException(
+ "Index is not in READ_WRITE mode and reindexing can't be launched. Please start reindexing on coordinator node.");
+ }
+ if (isSuspended || !handler.isOnline())
+ {
+ throw new IllegalStateException("Can't start reindexing while index is "
+ + ((isSuspended) ? "SUSPENDED." : "already OFFLINE (it means that reindexing is in progress).") + ".");
+ }
+
+ log.info("Starting hot reindexing on the " + handler.getContext().getRepositoryName() + "/"
+ + handler.getContext().getContainer().getWorkspaceName() + ", with" + (dropExisting ? "" : "out")
+ + " dropping the existing indexes.");
+ // starting new thread, releasing JMX call
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ boolean successful = false;
+ hotReindexingState = "Running. Started at " + sdf.format(Calendar.getInstance().getTime());
+ try
+ {
+ isResponsibleForResuming = true;
+ // set offline cluster wide (will make merger disposed and volatile flushed)
+ if (rpcService != null && changesFilter.isShared())
+ {
+ rpcService.executeCommandOnAllNodes(changeIndexState, true, false, !dropExisting);
+ }
+ else
+ {
+ handler.setOnline(false, !dropExisting);
+ }
+ // launch reindexing thread safely, resume nodes if any exception occurs
+ if (handler instanceof SearchIndex)
+ {
+ ((SearchIndex)handler).getIndex().reindex(itemMgr);
+ successful = true;
+ }
+ else
+ {
+ log.error("This kind of QuerHandler class doesn't support hot reindxing.");
+ }
+ }
+ catch (RepositoryException e)
+ {
+ log.error("Error while reindexing the workspace", e);
+ }
+ catch (SecurityException e)
+ {
+ log.error("Can't change state to offline.", e);
+ }
+ catch (RPCException e)
+ {
+ log.error("Can't change state to offline.", e);
+ }
+ catch (IOException e)
+ {
+ log.error("Erroe while reindexing the workspace", e);
+ }
+ // safely change state back
+ finally
+ {
+ // finish, setting indexes back online
+ if (rpcService != null && changesFilter.isShared())
+ {
+ try
+ {
+ // if dropExisting, then queries are no allowed
+ rpcService.executeCommandOnAllNodes(changeIndexState, true, true, true);
+ }
+ catch (SecurityException e)
+ {
+ log.error("Error setting index back online in a cluster", e);
+ }
+ catch (RPCException e)
+ {
+ log.error("Error setting index back online in a cluster", e);
+ }
+ }
+ else
+ {
+ try
+ {
+ handler.setOnline(true, true);
+ }
+ catch (IOException e)
+ {
+ log.error("Error setting index back online locally");
+ }
+ }
+ if (successful)
+ {
+ hotReindexingState = "Finished at " + sdf.format(Calendar.getInstance().getTime());
+ log.info("Reindexing finished successfully.");
+ }
+ else
+ {
+ hotReindexingState = "Stopped with errors at " + sdf.format(Calendar.getInstance().getTime());
+ log.info("Reindexing halted with errors.");
+ }
+ isResponsibleForResuming = false;
+ }
+ }
+ }, "HotReindexing-" + handler.getContext().getRepositoryName() + "-"
+ + handler.getContext().getContainer().getWorkspaceName()).start();
+ }
+
+ @Managed
+ @ManagedDescription("Hot async reindexing state")
+ public String getHotReindexingState()
+ {
+ return hotReindexingState;
+ }
+
+ @Managed
+ @ManagedDescription("Index IO mode (READ_ONLY/READ_WRITE)")
+ public String getIOMode()
+ {
+ if (handler == null || handler.getIndexerIoModeHandler() == null)
+ {
+ return "not initialized";
+ }
+ return (handler.getIndexerIoModeHandler().getMode() == IndexerIoMode.READ_WRITE) ? "READ_WRITE" : "READ_ONLY";
+ }
+
+ @Managed
+ @ManagedDescription("Index state (Online/Offline(indexing))")
+ public String getState()
+ {
+ if (handler == null)
+ {
+ return "not initialized";
+ }
+ return handler.isOnline() ? "Online" : "Offline (indexing)";
+ }
+
+ @Managed
+ @ManagedDescription("QueryHandler class")
+ public String getQuerHandlerClass()
+ {
+ if (handler != null)
+ {
+ return handler.getClass().getCanonicalName();
+ }
+ else
+ {
+ return "not initialized";
+ }
+ }
+
+ @Managed
+ @ManagedDescription("ChangesFilter class")
+ public String getChangesFilterClass()
+ {
+ if (changesFilter != null)
+ {
+ return changesFilter.getClass().getCanonicalName();
+ }
+ else
+ {
+ return "not initialized";
+ }
+ }
+
+ /**
* Register remote commands.
*/
private void doInitRemoteCommands()
@@ -1159,10 +1374,32 @@
return isResponsibleForResuming;
}
});
+
+ changeIndexState = rpcService.registerCommand(new RemoteCommand()
+ {
+ public String getId()
+ {
+ return "org.exoplatform.services.jcr.impl.core.query.SearchManager-changeIndexerState-" + wsId + "-"
+ + (parentSearchManager == null);
+ }
+
+ public Serializable execute(Serializable[] args) throws Throwable
+ {
+ boolean isOnline = (Boolean)args[0];
+ boolean allowQuery = (args.length == 2) ? (Boolean)args[1] : false;
+ SearchManager.this.setOnline(isOnline, allowQuery);
+ return null;
+ }
+ });
+
}
protected void suspendLocally() throws SuspendException
{
+ if (!handler.isOnline())
+ {
+ throw new SuspendException("Can't suspend index, while reindexing in progeress.");
+ }
if (isSuspended)
{
throw new SuspendException("Component already suspended.");
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/ispn/ISPNIndexChangesFilter.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/ispn/ISPNIndexChangesFilter.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/ispn/ISPNIndexChangesFilter.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -158,4 +158,12 @@
log.warn("Exception occure when errorLog writed. Error log is not complete. " + ioe, ioe);
}
}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter#isShared()
+ */
+ public boolean isShared()
+ {
+ return true;
+ }
}
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/jbosscache/JBossCacheIndexChangesFilter.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -215,4 +215,12 @@
log.warn("Exception occure when errorLog writed. Error log is not complete. " + ioe, ioe);
}
}
+
+ /**
+ * @see org.exoplatform.services.jcr.impl.core.query.IndexerChangesFilter#isShared()
+ */
+ public boolean isShared()
+ {
+ return true;
+ }
}
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexingQueue.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexingQueue.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/IndexingQueue.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -53,7 +53,7 @@
/**
* Maps UUID {@link String}s to {@link Document}s.
*/
- private final Map pendingDocuments = new HashMap();
+ private final Map pendingDocuments = new HashMap(1);
/**
* Flag that indicates whether this indexing queue had been
@@ -143,7 +143,7 @@
public Document[] getFinishedDocuments()
{
checkInitialized();
- List finished = new ArrayList();
+ List finished = new ArrayList(1);
synchronized (this)
{
finished.addAll(pendingDocuments.values());
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/MultiIndex.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -60,6 +60,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -113,7 +114,7 @@
/**
* Names of index directories that can be deleted.
*/
- private final Set deletable = new HashSet();
+ private final Set<String> deletable = new HashSet<String>();
/**
* List of open persistent indexes. This list may also contain an open
@@ -121,9 +122,14 @@
* registered with indexNames and <b>must not</b> be used in regular index
* operations (delete node, etc.)!
*/
- private final List indexes = new ArrayList();
+ private final List<PersistentIndex> indexes = new ArrayList<PersistentIndex>();
/**
+ * Contains list of open persistent indexes in case when hot async reindexing launched.
+ */
+ private final List<PersistentIndex> staleIndexes = new ArrayList<PersistentIndex>();
+
+ /**
* The internal namespace mappings of the query manager.
*/
private final NamespaceMappings nsMappings;
@@ -255,8 +261,8 @@
/**
* The unique id of the workspace corresponding to this multi index
*/
- final String workspaceId;
-
+ final String workspaceId;
+
/**
* Creates a new MultiIndex.
*
@@ -535,6 +541,51 @@
}
/**
+ * Recreates index by reindexing in runtime.
+ *
+ * @param stateMgr
+ * @throws RepositoryException
+ */
+ public void reindex(ItemDataConsumer stateMgr) throws IOException, RepositoryException
+ {
+ if (stopped)
+ {
+ throw new IllegalStateException("Can't invoke reindexing on closed index.");
+ }
+
+ if (online)
+ {
+ throw new IllegalStateException("Can't invoke reindexing while index still online.");
+ }
+
+ // traverse and index workspace
+ executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
+
+ long count;
+
+ // check if we have deal with RDBMS reindexing mechanism
+ Reindexable rdbmsReindexableComponent =
+ (Reindexable)handler.getContext().getContainer().getComponent(Reindexable.class);
+
+ if (handler.isRDBMSReindexing() && rdbmsReindexableComponent != null
+ && rdbmsReindexableComponent.isReindexingSupport())
+ {
+ count =
+ createIndex(rdbmsReindexableComponent.getNodeDataIndexingIterator(handler.getReindexingPageSize()),
+ indexingTree.getIndexingRoot());
+ }
+ else
+ {
+ count = createIndex(indexingTree.getIndexingRoot(), stateMgr);
+ }
+
+ executeAndLog(new Commit(getTransactionId()));
+ log.info("Created initial index for {} nodes", new Long(count));
+ releaseMultiReader();
+
+ }
+
+ /**
* Atomically updates the index by removing some documents and adding
* others.
*
@@ -548,7 +599,7 @@
* @throws IOException
* if an error occurs while updating the index.
*/
- synchronized void update(final Collection remove, final Collection add) throws IOException
+ synchronized void update(final Collection<String> remove, final Collection<Document> add) throws IOException
{
if (!online)
{
@@ -572,7 +623,7 @@
* @param add
* @throws IOException
*/
- private void doUpdateRO(final Collection remove, final Collection add) throws IOException
+ private void doUpdateRO(final Collection<String> remove, final Collection<Document> add) throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
{
@@ -594,9 +645,9 @@
ReadOnlyIndexReader lastIndexReader = null;
try
{
- for (Iterator it = remove.iterator(); it.hasNext();)
+ for (Iterator<String> it = remove.iterator(); it.hasNext();)
{
- Term idTerm = new Term(FieldNames.UUID, (String)it.next());
+ Term idTerm = new Term(FieldNames.UUID, it.next());
volatileIndex.removeDocument(idTerm);
}
@@ -606,17 +657,16 @@
try
{
lastIndexReader =
- (lastIndexReaderId >= 0) ? ((PersistentIndex)indexes.get(lastIndexReaderId))
- .getReadOnlyIndexReader() : null;
+ (lastIndexReaderId >= 0) ? (indexes.get(lastIndexReaderId)).getReadOnlyIndexReader() : null;
}
catch (Throwable e)
{
// this is safe index reader retrieval. The last index already closed, possibly merged or any other exception that occurs here
}
- for (Iterator it = add.iterator(); it.hasNext();)
+ for (Iterator<Document> it = add.iterator(); it.hasNext();)
{
- Document doc = (Document)it.next();
+ Document doc = it.next();
if (doc != null)
{
// check if this item should be placed in own volatile index
@@ -642,8 +692,7 @@
lastIndexReaderId = indexes.size() - 1;
try
{
- lastIndexReader =
- ((PersistentIndex)indexes.get(lastIndexReaderId)).getReadOnlyIndexReader();
+ lastIndexReader = (indexes.get(lastIndexReaderId)).getReadOnlyIndexReader();
}
catch (Throwable e)
{
@@ -706,7 +755,7 @@
* @param add
* @throws IOException
*/
- private void doUpdateRW(final Collection remove, final Collection add) throws IOException
+ private void doUpdateRW(final Collection<String> remove, final Collection<Document> add) throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
{
@@ -737,13 +786,13 @@
long transactionId = nextTransactionId++;
executeAndLog(new Start(transactionId));
- for (Iterator it = remove.iterator(); it.hasNext();)
+ for (Iterator<String> it = remove.iterator(); it.hasNext();)
{
- executeAndLog(new DeleteNode(transactionId, (String)it.next()));
+ executeAndLog(new DeleteNode(transactionId, it.next()));
}
- for (Iterator it = add.iterator(); it.hasNext();)
+ for (Iterator<Document> it = add.iterator(); it.hasNext();)
{
- Document doc = (Document)it.next();
+ Document doc = it.next();
if (doc != null)
{
executeAndLog(new AddNode(transactionId, doc));
@@ -784,7 +833,7 @@
{
List<String> processedIDs = offlineIndex.getProcessedIDs();
// remove all nodes placed in offline index
- update(processedIDs, Collections.EMPTY_LIST);
+ update(processedIDs, Collections.<Document> emptyList());
executeAndLog(new Start(Action.INTERNAL_TRANSACTION));
@@ -815,21 +864,21 @@
* @param add
* @throws IOException
*/
- private void doUpdateOffline(final Collection remove, final Collection add) throws IOException
+ private void doUpdateOffline(final Collection<String> remove, final Collection<Document> add) throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
{
public Object run() throws Exception
{
- for (Iterator it = remove.iterator(); it.hasNext();)
+ for (Iterator<String> it = remove.iterator(); it.hasNext();)
{
- Term idTerm = new Term(FieldNames.UUID, (String)it.next());
+ Term idTerm = new Term(FieldNames.UUID, it.next());
offlineIndex.removeDocument(idTerm);
}
- for (Iterator it = add.iterator(); it.hasNext();)
+ for (Iterator<Document> it = add.iterator(); it.hasNext();)
{
- Document doc = (Document)it.next();
+ Document doc = it.next();
if (doc != null)
{
offlineIndex.addDocuments(new Document[]{doc});
@@ -855,7 +904,7 @@
*/
void addDocument(Document doc) throws IOException
{
- update(Collections.EMPTY_LIST, Arrays.asList(new Document[]{doc}));
+ update(Collections.<String> emptyList(), Arrays.asList(new Document[]{doc}));
}
/**
@@ -868,7 +917,7 @@
*/
void removeDocument(String uuid) throws IOException
{
- update(Arrays.asList(new String[]{uuid}), Collections.EMPTY_LIST);
+ update(Arrays.asList(new String[]{uuid}), Collections.<Document> emptyList());
}
/**
@@ -899,7 +948,7 @@
}
for (int i = 0; i < indexes.size(); i++)
{
- PersistentIndex index = (PersistentIndex)indexes.get(i);
+ PersistentIndex index = indexes.get(i);
// only remove documents from registered indexes
if (indexNames.contains(index.getName()))
{
@@ -946,14 +995,14 @@
*/
synchronized IndexReader[] getIndexReaders(String[] indexNames, IndexListener listener) throws IOException
{
- Set names = new HashSet(Arrays.asList(indexNames));
- Map indexReaders = new HashMap();
+ Set<String> names = new HashSet<String>(Arrays.asList(indexNames));
+ Map<ReadOnlyIndexReader, PersistentIndex> indexReaders = new HashMap<ReadOnlyIndexReader, PersistentIndex>();
try
{
- for (Iterator it = indexes.iterator(); it.hasNext();)
+ for (Iterator<PersistentIndex> it = indexes.iterator(); it.hasNext();)
{
- PersistentIndex index = (PersistentIndex)it.next();
+ PersistentIndex index = it.next();
if (names.contains(index.getName()))
{
indexReaders.put(index.getReadOnlyIndexReader(listener), index);
@@ -963,10 +1012,11 @@
catch (IOException e)
{
// release readers obtained so far
- for (Iterator it = indexReaders.entrySet().iterator(); it.hasNext();)
+ for (Iterator<Entry<ReadOnlyIndexReader, PersistentIndex>> it = indexReaders.entrySet().iterator(); it
+ .hasNext();)
{
- Map.Entry entry = (Map.Entry)it.next();
- final ReadOnlyIndexReader reader = (ReadOnlyIndexReader)entry.getKey();
+ Map.Entry<ReadOnlyIndexReader, PersistentIndex> entry = it.next();
+ final ReadOnlyIndexReader reader = entry.getKey();
try
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
@@ -982,12 +1032,12 @@
{
log.warn("Exception releasing index reader: " + ex);
}
- ((PersistentIndex)entry.getValue()).resetListener();
+ (entry.getValue()).resetListener();
}
throw e;
}
- return (IndexReader[])indexReaders.keySet().toArray(new IndexReader[indexReaders.size()]);
+ return indexReaders.keySet().toArray(new IndexReader[indexReaders.size()]);
}
/**
@@ -1004,9 +1054,9 @@
synchronized PersistentIndex getOrCreateIndex(String indexName) throws IOException
{
// check existing
- for (Iterator it = indexes.iterator(); it.hasNext();)
+ for (Iterator<PersistentIndex> it = indexes.iterator(); it.hasNext();)
{
- PersistentIndex idx = (PersistentIndex)it.next();
+ PersistentIndex idx = it.next();
if (idx.getName().equals(indexName))
{
return idx;
@@ -1066,9 +1116,9 @@
synchronized boolean hasIndex(String indexName) throws IOException
{
// check existing
- for (Iterator it = indexes.iterator(); it.hasNext();)
+ for (Iterator<PersistentIndex> it = indexes.iterator(); it.hasNext();)
{
- PersistentIndex idx = (PersistentIndex)it.next();
+ PersistentIndex idx = it.next();
if (idx.getName().equals(indexName))
{
return true;
@@ -1094,7 +1144,8 @@
* @throws IOException
* if an exception occurs while replacing the indexes.
*/
- void replaceIndexes(String[] obsoleteIndexes, final PersistentIndex index, Collection deleted) throws IOException
+ void replaceIndexes(String[] obsoleteIndexes, final PersistentIndex index, Collection<Term> deleted)
+ throws IOException
{
if (handler.isInitializeHierarchyCache())
@@ -1128,11 +1179,11 @@
executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES));
}
// delete obsolete indexes
- Set names = new HashSet(Arrays.asList(obsoleteIndexes));
- for (Iterator it = names.iterator(); it.hasNext();)
+ Set<String> names = new HashSet<String>(Arrays.asList(obsoleteIndexes));
+ for (Iterator<String> it = names.iterator(); it.hasNext();)
{
// do not try to delete indexes that are already gone
- String indexName = (String)it.next();
+ String indexName = it.next();
if (indexNames.contains(indexName))
{
executeAndLog(new DeleteIndex(getTransactionId(), indexName));
@@ -1147,9 +1198,9 @@
executeAndLog(new AddIndex(getTransactionId(), index.getName()));
// delete documents in index
- for (Iterator it = deleted.iterator(); it.hasNext();)
+ for (Iterator<Term> it = deleted.iterator(); it.hasNext();)
{
- Term id = (Term)it.next();
+ Term id = it.next();
index.removeDocument(id);
}
index.commit();
@@ -1240,10 +1291,14 @@
// meantime -> check again
if (multiReader == null)
{
- List readerList = new ArrayList();
- for (int i = 0; i < indexes.size(); i++)
+ // if index in offline mode, due to hot async reindexing,
+ // need to return the reader containing only stale indexes (old),
+ // without newly created.
+ List<PersistentIndex> persistedIndexesList = online ? indexes : staleIndexes;
+ List<ReadOnlyIndexReader> readerList = new ArrayList<ReadOnlyIndexReader>();
+ for (int i = 0; i < persistedIndexesList.size(); i++)
{
- PersistentIndex pIdx = (PersistentIndex)indexes.get(i);
+ PersistentIndex pIdx = persistedIndexesList.get(i);
if (indexNames.contains(pIdx.getName()))
{
@@ -1261,8 +1316,7 @@
}
}
readerList.add(volatileIndex.getReadOnlyIndexReader());
- ReadOnlyIndexReader[] readers =
- (ReadOnlyIndexReader[])readerList.toArray(new ReadOnlyIndexReader[readerList.size()]);
+ ReadOnlyIndexReader[] readers = readerList.toArray(new ReadOnlyIndexReader[readerList.size()]);
multiReader = new CachingMultiIndexReader(readers, cache);
}
multiReader.acquire();
@@ -1330,7 +1384,7 @@
volatileIndex.close();
for (int i = 0; i < indexes.size(); i++)
{
- ((PersistentIndex)indexes.get(i)).close();
+ (indexes.get(i)).close();
}
// close indexing queue
@@ -1476,7 +1530,7 @@
// commit persistent indexes
for (int i = indexes.size() - 1; i >= 0; i--)
{
- PersistentIndex index = (PersistentIndex)indexes.get(i);
+ PersistentIndex index = indexes.get(i);
// only commit indexes we own
// index merger also places PersistentIndex instances in
// indexes,
@@ -1961,9 +2015,9 @@
{
synchronized (deletable)
{
- for (Iterator it = deletable.iterator(); it.hasNext();)
+ for (Iterator<String> it = deletable.iterator(); it.hasNext();)
{
- String indexName = (String)it.next();
+ String indexName = it.next();
if (directoryManager.delete(indexName))
{
it.remove();
@@ -2011,7 +2065,8 @@
*/
private synchronized void checkFlush()
{
- long idleTime = System.currentTimeMillis() - lastFlushTime;
+ // avoid frequent flushes during reindexing;
+ long idleTime = online ? System.currentTimeMillis() - lastFlushTime : 0;
long volatileTime = System.currentTimeMillis() - lastFileSystemFlushTime;
// do not flush if volatileIdleTime is zero or negative
if ((handler.getVolatileIdleTime() > 0 && idleTime > handler.getVolatileIdleTime() * 1000)
@@ -2075,7 +2130,7 @@
private void checkIndexingQueue(boolean transactionPresent)
{
Document[] docs = indexingQueue.getFinishedDocuments();
- Map finished = new HashMap();
+ Map<String, Document> finished = new HashMap<String, Document>();
for (int i = 0; i < docs.length; i++)
{
String uuid = docs[i].get(FieldNames.UUID);
@@ -2088,7 +2143,7 @@
log.info("updating index with {} nodes from indexing queue.", new Long(finished.size()));
// remove documents from the queue
- for (Iterator it = finished.keySet().iterator(); it.hasNext();)
+ for (Iterator<String> it = finished.keySet().iterator(); it.hasNext();)
{
indexingQueue.removeDocument(it.next().toString());
}
@@ -2097,13 +2152,13 @@
{
if (transactionPresent)
{
- for (Iterator it = finished.keySet().iterator(); it.hasNext();)
+ for (Iterator<String> it = finished.keySet().iterator(); it.hasNext();)
{
- executeAndLog(new DeleteNode(getTransactionId(), (String)it.next()));
+ executeAndLog(new DeleteNode(getTransactionId(), it.next()));
}
- for (Iterator it = finished.values().iterator(); it.hasNext();)
+ for (Iterator<Document> it = finished.values().iterator(); it.hasNext();)
{
- executeAndLog(new AddNode(getTransactionId(), (Document)it.next()));
+ executeAndLog(new AddNode(getTransactionId(), it.next()));
}
}
else
@@ -2839,9 +2894,9 @@
public void execute(MultiIndex index) throws IOException
{
// get index if it exists
- for (Iterator it = index.indexes.iterator(); it.hasNext();)
+ for (Iterator<PersistentIndex> it = index.indexes.iterator(); it.hasNext();)
{
- PersistentIndex idx = (PersistentIndex)it.next();
+ PersistentIndex idx = it.next();
if (idx.getName().equals(indexName))
{
idx.close();
@@ -2944,7 +2999,7 @@
for (int i = index.indexes.size() - 1; i >= 0; i--)
{
// only look in registered indexes
- PersistentIndex idx = (PersistentIndex)index.indexes.get(i);
+ PersistentIndex idx = index.indexes.get(i);
if (index.indexNames.contains(idx.getName()))
{
num = idx.removeDocument(idTerm);
@@ -3353,18 +3408,32 @@
log.info("Setting index back online");
offlineIndex.commit(true);
online = true;
+ // cleaning stale indexes
+ for (PersistentIndex staleIndex : staleIndexes)
+ {
+ deleteIndex(staleIndex);
+ }
//invoking offline index
invokeOfflineIndex();
+ staleIndexes.clear();
+ doInitIndexMerger();
+ merger.start();
}
// switching to OFFLINE
else
{
log.info("Setting index offline");
+ merger.dispose();
offlineIndex =
new OfflinePersistentIndex(handler.getTextAnalyzer(), handler.getSimilarity(), cache, indexingQueue,
directoryManager);
+ if (modeHandler.getMode() == IndexerIoMode.READ_WRITE)
+ {
+ flush();
+ }
+ releaseMultiReader();
+ staleIndexes.addAll(indexes);
online = false;
- flush();
}
}
else if (!online)
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/RedoLog.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -125,9 +125,9 @@
* redo log.
* @throws IOException if an error occurs while reading from the redo log.
*/
- List getActions() throws IOException
+ List<MultiIndex.Action> getActions() throws IOException
{
- final List actions = new ArrayList();
+ final List<MultiIndex.Action> actions = new ArrayList<MultiIndex.Action>();
read(new ActionCollector()
{
public void collect(MultiIndex.Action a)
@@ -182,7 +182,7 @@
* Clears the redo log.
* @throws IOException if the redo log cannot be cleared.
*/
- void clear() throws IOException
+ synchronized void clear() throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
{
@@ -193,7 +193,15 @@
out.close();
out = null;
}
- dir.deleteFile(REDO_LOG);
+ try
+ {
+ dir.deleteFile(REDO_LOG);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
entryCount = 0;
return null;
}
@@ -205,20 +213,20 @@
* @throws IOException if an error occurs while creating the
* output stream.
*/
- private void initOut() throws IOException
+ private synchronized void initOut() throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
{
- public Object run() throws Exception
- {
if (out == null)
{
OutputStream os = new IndexOutputStream(dir.createOutput(REDO_LOG));
out = new BufferedWriter(new OutputStreamWriter(os));
}
- return null;
- }
- });
+ return null;
+ }
+ });
}
/**
@@ -231,9 +239,9 @@
private void read(final ActionCollector collector) throws IOException
{
SecurityHelper.doPrivilegedIOExceptionAction(new PrivilegedExceptionAction<Object>()
+ {
+ public Object run() throws Exception
{
- public Object run() throws Exception
- {
if (!dir.fileExists(REDO_LOG))
{
return null;
@@ -269,9 +277,9 @@
}
}
}
- return null;
- }
- });
+ return null;
+ }
+ });
}
//-----------------------< internal >---------------------------------------
Modified: jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/main/java/org/exoplatform/services/jcr/impl/core/query/lucene/SearchIndex.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -468,6 +468,11 @@
private boolean closed = false;
/**
+ * Allows or denies queries while index is offline.
+ */
+ private boolean allowQuery = true;
+
+ /**
* Text extractor for extracting text content of binary properties.
*/
private DocumentReaderService extractor;
@@ -486,7 +491,7 @@
* The unique id of the workspace corresponding to current instance of {@link SearchIndex}
*/
private final String wsId;
-
+
private final ConfigurationManager cfm;
/**
@@ -721,8 +726,8 @@
}
modeHandler.addIndexerIoModeListener(this);
- }
-
+ }
+
/**
* @return the wsId
*/
@@ -1299,9 +1304,8 @@
*/
protected IndexReader getIndexReader(boolean includeSystemIndex) throws IOException
{
- // deny query execution if index in offline mode
- // TODO Replace with special Exception Type
- if (!index.isOnline())
+ // deny query execution if index in offline mode and allowQuery is false
+ if (!index.isOnline() && !allowQuery)
{
throw new IndexOfflineIOException("Index is offline");
}
@@ -2133,7 +2137,7 @@
{
try
{
- Class analyzerClass = Class.forName(analyzerClassName);
+ Class<?> analyzerClass = Class.forName(analyzerClassName);
analyzer.setDefaultAnalyzer((Analyzer)analyzerClass.newInstance());
}
catch (Exception e)
@@ -2723,7 +2727,7 @@
{
try
{
- Class similarityClass = Class.forName(className);
+ Class<?> similarityClass = Class.forName(className);
similarity = (Similarity)similarityClass.newInstance();
}
catch (Exception e)
@@ -3072,11 +3076,19 @@
}
/**
- * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean)
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean, boolean)
*/
- public void setOnline(boolean isOnline) throws IOException
+ public void setOnline(boolean isOnline, boolean allowQuery) throws IOException
{
checkOpen();
+ if (isOnline)
+ {
+ this.allowQuery = true;
+ }
+ else
+ {
+ this.allowQuery = allowQuery;
+ }
index.setOnline(isOnline);
}
Modified: jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java
===================================================================
--- jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-04-22 09:39:17 UTC (rev 4273)
+++ jcr/trunk/exo.jcr.component.core/src/test/java/org/exoplatform/services/jcr/api/core/query/lucene/SlowQueryHandler.java 2011-04-22 12:19:42 UTC (rev 4274)
@@ -108,19 +108,19 @@
}
/**
- * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean)
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#isOnline()
*/
- public void setOnline(boolean isOnline)
+ public boolean isOnline()
{
- // TODO Auto-generated method stub
-
+ return true;
}
/**
- * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#isOnline()
+ * @see org.exoplatform.services.jcr.impl.core.query.QueryHandler#setOnline(boolean, boolean)
*/
- public boolean isOnline()
+ public void setOnline(boolean isOnline, boolean allowQuery) throws IOException
{
- return true;
+ // TODO Auto-generated method stub
+
}
}
More information about the exo-jcr-commits
mailing list