[hibernate-commits] Hibernate SVN: r15316 - in search/trunk/src/java/org/hibernate/search: backend/impl/lucene and 2 other directories.
hibernate-commits at lists.jboss.org
hibernate-commits at lists.jboss.org
Fri Oct 10 04:47:53 EDT 2008
Author: sannegrinovero
Date: 2008-10-10 04:47:53 -0400 (Fri, 10 Oct 2008)
New Revision: 15316
Added:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/IndexInteractionType.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java
Removed:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
Modified:
search/trunk/src/java/org/hibernate/search/backend/Workspace.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
search/trunk/src/java/org/hibernate/search/store/optimization/IncrementalOptimizerStrategy.java
Log:
backend refactoring, step2; and fix for:
HSEARCH-272 Improve contention on DirectoryProviders in lucene backend
HSEARCH-263 Wrong analyzers used in IndexWriter
HSEARCH-271 Wrong Similarity used when sharing index among entities
(sorry, it's not practical to split them)
Modified: search/trunk/src/java/org/hibernate/search/backend/Workspace.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/Workspace.java 2008-10-10 01:26:16 UTC (rev 15315)
+++ search/trunk/src/java/org/hibernate/search/backend/Workspace.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -2,9 +2,8 @@
package org.hibernate.search.backend;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.SimpleAnalyzer;
@@ -13,6 +12,7 @@
import org.apache.lucene.store.Directory;
import org.hibernate.annotations.common.AssertionFailure;
import org.hibernate.search.SearchException;
+import org.hibernate.search.SearchFactory;
import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.store.DirectoryProvider;
@@ -21,20 +21,13 @@
import org.slf4j.Logger;
/**
- * Lucene workspace.
- * <p/>
- * <b>This is not intended to be used in a multithreaded environment</b>.
- * <p/>
+ * Lucene workspace for a DirectoryProvider.<p/>
* <ul>
- * <li>One cannot execute modification through an IndexReader when an IndexWriter has been acquired
- * on the same underlying directory
- * </li>
- * <li>One cannot get an IndexWriter when an IndexReader have been acquired and modified on the same
- * underlying directory
- * </li>
- * <li>The recommended approach is to execute all the modifications on the IndexReaders, {@link #clean()}, and acquire the
- * index writers
- * </li>
+ * <li>Before using getIndexWriter or getIndexReader the lock must be acquired, and resources must be closed
+ * before releasing the lock.</li>
+ * <li>One cannot get an IndexWriter when an IndexReader has been acquired and not closed, and vice-versa.</li>
+ * <li>The recommended approach is to execute all the modifications on the IndexReader, and after that on
+ * the IndexWriter</li>
* </ul>
*
* @author Emmanuel Bernard
@@ -42,34 +35,45 @@
* @author Sanne Grinovero
*/
//TODO introduce the notion of read only IndexReader? We cannot enforce it because Lucene uses abstract classes, not interfaces.
+//TODO renaming to "DirectoryWorkspace" would be nice.
public class Workspace {
-
+
private final Logger log = LoggerFactory.getLogger( Workspace.class );
- private final SearchFactoryImplementor searchFactoryImplementor;
private static final Analyzer SIMPLE_ANALYZER = new SimpleAnalyzer();
- private final Map<DirectoryProvider, DPWorkspace> directorySpace = new HashMap<DirectoryProvider, DPWorkspace>() {
+
+ // invariant state:
- @Override
- public DPWorkspace get(Object key) {
- DirectoryProvider dp = (DirectoryProvider) key;
- DPWorkspace directoryWorkSpace = super.get( dp );
- if ( directoryWorkSpace==null ) {
- directoryWorkSpace = new DPWorkspace( dp );
- put( dp, directoryWorkSpace );
- }
- return directoryWorkSpace;
- }
-
- };
+ private final SearchFactoryImplementor searchFactoryImplementor;
+ private final DirectoryProvider directoryProvider;
+ private final OptimizerStrategy optimizerStrategy;
+ private final ReentrantLock lock;
+ private final boolean singleEntityInDirectory;
+ private final LuceneIndexingParameters indexingParams;
+
+ // variable state:
/**
- * Flag indicating if the current work should be executed using
- * the Lucene parameters for batch indexing.
+ * Current open IndexReader, or null when closed. Guarded by synchronization.
*/
- private boolean isBatch = false;
-
- public Workspace(SearchFactoryImplementor searchFactoryImplementor) {
+ private IndexReader reader;
+
+ /**
+ * Current open IndexWriter, or null when closed. Guarded by synchronization.
+ */
+ private IndexWriter writer;
+
+ /**
+ * Keeps a count of modification operations done on the index.
+ */
+ private final AtomicLong operations = new AtomicLong( 0L );
+
+ public Workspace(SearchFactoryImplementor searchFactoryImplementor, DirectoryProvider provider) {
this.searchFactoryImplementor = searchFactoryImplementor;
+ this.directoryProvider = provider;
+ this.optimizerStrategy = searchFactoryImplementor.getOptimizerStrategy( directoryProvider );
+ this.singleEntityInDirectory = searchFactoryImplementor.getClassesInDirectoryProvider( provider ).size() == 1;
+ this.indexingParams = searchFactoryImplementor.getIndexingParameters( directoryProvider );
+ this.lock = searchFactoryImplementor.getDirectoryProviderLock( provider );
}
public DocumentBuilder getDocumentBuilder(Class entity) {
@@ -77,285 +81,183 @@
}
/**
- * Retrieve a read write IndexReader; the purpose should be to
- * modify the index. (mod count will be incremented)
- * For a given DirectoryProvider, An IndexReader must be used before an IndexWriter
+ * If optimization has not been forced give a change to configured OptimizerStrategy
+ * to optimize the index.
+ * @throws AssertionFailure if the lock is not owned or if an IndexReader is open.
*/
- public IndexReader getIndexReader(DirectoryProvider provider, Class entity) {
- DPWorkspace space = directorySpace.get( provider );
- return space.getIndexReader( entity );
+ public void optimizerPhase() {
+ assertOwnLock();
+ // used getAndSet(0) because Workspace is going to be reused by next transaction.
+ optimizerStrategy.addTransaction( operations.getAndSet( 0L ) );
+ optimizerStrategy.optimize( this );
}
-
- //for index optimization
- public IndexWriter getIndexWriter(DirectoryProvider provider) {
- return getIndexWriter( provider, null, false );
+
+ /**
+ * Used by OptimizeLuceneWork after index optimization to flag that
+ * optimization has been forced.
+ * @see OptimizeLuceneWork
+ * @see SearchFactory#optimize()
+ * @see SearchFactory#optimize(Class)
+ */
+ public void optimize() {
+ assertOwnLock(); // the DP is not affected, but needs to ensure the optimizerStrategy is accesses in threadsafe way
+ optimizerStrategy.optimizationForced();
}
/**
- * retrieve a read write IndexWriter
- * For a given DirectoryProvider, An IndexReader must be used before an IndexWriter
+ * Gets an IndexReader to alter the index, opening one if needed.
+ * The caller needs to own the lock relevant to this DirectoryProvider.
+ * @throws AssertionFailure if an IndexWriter is open or if the lock is not owned.
+ * @return a new IndexReader or one already open.
+ * @see #lock()
*/
- public IndexWriter getIndexWriter(DirectoryProvider provider, Class entity, boolean modificationOperation) {
- DPWorkspace space = directorySpace.get( provider );
- return space.getIndexWriter( entity, modificationOperation );
+ public synchronized IndexReader getIndexReader() {
+ assertOwnLock();
+ // one cannot access a reader for update while a writer is in use
+ if ( writer != null )
+ throw new AssertionFailure( "Tries to read for update an index while a writer is in use." );
+ if ( reader != null )
+ return reader;
+ Directory directory = directoryProvider.getDirectory();
+ try {
+ reader = IndexReader.open( directory );
+ log.trace( "IndexReader opened" );
+ }
+ catch ( IOException e ) {
+ reader = null;
+ throw new SearchException( "Unable to open IndexReader on directory " + directory, e );
+ }
+ return reader;
}
- private void cleanUp(SearchException originalException) {
- //release all readers and writers, then release locks
- SearchException raisedException = originalException;
- for ( DPWorkspace space : directorySpace.values() ) {
+ /**
+ * Closes a previously opened IndexReader.
+ * @throws SearchException on IOException during Lucene close operation.
+ * @throws AssertionFailure if the lock is not owned or if there is no IndexReader to close.
+ * @see #getIndexReader()
+ */
+ public synchronized void closeIndexReader() {
+ assertOwnLock();
+ IndexReader toClose = reader;
+ reader = null;
+ if ( toClose != null ) {
try {
- space.closeIndexReader();
+ toClose.close();
+ log.trace( "IndexReader closed" );
}
- catch (IOException e) {
- if ( raisedException != null ) {
- log.error( "Subsequent Exception while closing IndexReader", e );
- }
- else {
- raisedException = new SearchException( "Exception while closing IndexReader", e );
- }
+ catch ( IOException e ) {
+ throw new SearchException( "Exception while closing IndexReader", e );
}
}
- //first release all locks for DirectoryProviders not needing optimization
- for ( DPWorkspace space : directorySpace.values() ) {
- if ( ! space.needsOptimization() ) {
- raisedException = closeWriterAndUnlock( space, raisedException );
- }
+ else {
+ throw new AssertionFailure( "No IndexReader open to close." );
}
- //then for remaining DirectoryProvider
- for ( DPWorkspace space : directorySpace.values() ) {
- if ( space.needsOptimization() ) {
- if ( raisedException == null ) {//avoid optimizations in case of errors or exceptions
- OptimizerStrategy optimizerStrategy = space.getOptimizerStrategy();
- optimizerStrategy.addTransaction( space.countOperations() );
- try {
- optimizerStrategy.optimize( this );
- }
- catch (SearchException e) {
- //this will also cause skipping other optimizations:
- raisedException = new SearchException( "Exception while optimizing directoryProvider: "
- + space.getDirectory().toString(), e );
- }
- }
- raisedException = closeWriterAndUnlock( space, raisedException );
- }
- }
- if ( raisedException != null ) throw raisedException;
}
-
- private SearchException closeWriterAndUnlock( DPWorkspace space, SearchException raisedException ) {
+
+ /**
+ * Gets the IndexWriter, opening one if needed.
+ * @param batchmode when true the indexWriter settings for batch mode will be applied.
+ * Ignored if IndexWriter is open already.
+ * @throws AssertionFailure if an IndexReader is open or the lock is not owned.
+ * @throws SearchException on a IOException during index opening.
+ * @return a new IndexWriter or one already open.
+ */
+ public synchronized IndexWriter getIndexWriter(boolean batchmode) {
+ assertOwnLock();
+ // one has to close a reader for update before a writer is accessed
+ if ( reader != null )
+ throw new AssertionFailure( "Tries to open an IndexWriter while an IndexReader is open in update mode." );
+ if ( writer != null )
+ return writer;
try {
- space.closeIndexWriter();
+ // don't care about the Analyzer as it will be selected during usage of IndexWriter.
+ writer = new IndexWriter( directoryProvider.getDirectory(), SIMPLE_ANALYZER, false ); // has been created at init time
+ indexingParams.applyToWriter( writer, batchmode );
+ log.trace( "IndexWriter opened" );
}
- catch (IOException e) {
- if ( raisedException != null ) {
- log.error( "Subsequent Exception while closing IndexWriter", e );
+ catch ( IOException e ) {
+ writer = null;
+ throw new SearchException( "Unable to open IndexWriter", e );
+ }
+ return writer;
+ }
+
+ /**
+ * Closes a previously opened IndexWriter.
+ * @throws SearchException on IOException during Lucene close operation.
+ * @throws AssertionFailure if there is no IndexWriter to close, or if the lock is not owned.
+ */
+ public synchronized void closeIndexWriter() {
+ assertOwnLock();
+ IndexWriter toClose = writer;
+ writer = null;
+ if ( toClose != null ) {
+ try {
+ toClose.close();
+ log.trace( "IndexWriter closed" );
}
- else {
- raisedException = new SearchException( "Exception while closing IndexWriter", e );
+ catch ( IOException e ) {
+ throw new SearchException( "Exception while closing IndexWriter", e );
}
}
- space.unLock();
- return raisedException;
+ else {
+ throw new AssertionFailure( "No open IndexWriter to close" );
+ }
}
/**
- * release resources consumed in the workspace if any
+ * Increment the counter of modification operations done on the index.
+ * Used (currently only) by the OptimizerStrategy.
+ * @param modCount the increment to add to the counter.
*/
- public void clean() {
- cleanUp( null );
+ public void incrementModificationCounter(int modCount) {
+ operations.addAndGet( modCount );
}
- public void optimize(DirectoryProvider provider) {
- DPWorkspace space = directorySpace.get( provider );
- OptimizerStrategy optimizerStrategy = space.getOptimizerStrategy();
- space.setOptimizationForced();
- optimizerStrategy.optimizationForced();
+ /**
+ * Some optimizations can be enabled only when the same Directory is not shared
+ * among more entities.
+ * @return true iff only one entity type is using this Directory.
+ */
+ public boolean isSingleEntityInDirectory() {
+ return singleEntityInDirectory;
}
-
- public boolean isBatch() {
- return isBatch;
- }
-
- public void setBatch(boolean isBatch) {
- this.isBatch = isBatch;
- }
- //TODO this should have it's own source file but currently needs the container-Workspace cleanup()
- //for exceptions. Best to wait for move to parallel batchWorkers (one per Directory)?
/**
- * A per-DirectoryProvider Workspace
+ * Acquires a lock on the DirectoryProvider backing this Workspace;
+ * this is required to use getIndexWriter(boolean), closeIndexWriter(),
+ * getIndexReader(), closeIndexReader().
+ * @see #getIndexWriter(boolean)
+ * @see #closeIndexWriter()
+ * @see #getIndexReader()
+ * @see #closeIndexReader()
*/
- private class DPWorkspace {
-
- private final DirectoryProvider directoryProvider;
- private final Lock lock;
-
- private IndexReader reader;
- private IndexWriter writer;
- private boolean locked = false;
- private boolean optimizationForced = false;
- private long operations = 0L;
-
- DPWorkspace(DirectoryProvider dp) {
- this.directoryProvider = dp;
- this.lock = searchFactoryImplementor.getDirectoryProviderLock( dp );
- }
-
- public boolean needsOptimization() {
- return isLocked() && !isOptimizationForced();
- }
+ public void lock() {
+ lock.lock();
+ }
- /**
- * Sets a flag to signal optimization has been forced.
- * Cannot be undone.
- */
- void setOptimizationForced() {
- optimizationForced = true;
- }
-
- /**
- * @return the Directory from the DirectoryProvider
- */
- Directory getDirectory() {
- return directoryProvider.getDirectory();
- }
-
- /**
- * @return A count of performed modification operations
- */
- long countOperations() {
- return operations;
- }
-
- /**
- * @return The OptimizerStrategy configured for this DirectoryProvider
- */
- OptimizerStrategy getOptimizerStrategy() {
- return searchFactoryImplementor.getOptimizerStrategy( directoryProvider );
- }
-
- /**
- * @return true if optimization has been forced
- */
- boolean isOptimizationForced() {
- return optimizationForced;
- }
-
- /**
- * @return true if underlying DirectoryProvider is locked
- */
- boolean isLocked() {
- return locked;
- }
-
- /**
- * Gets the currently open IndexWriter, or creates one.
- * If a IndexReader was open, it will be closed first.
- * @param entity The entity for which the IndexWriter is needed. entity can be null when calling for Optimize
- * @param modificationOperation set to true if needed to perform modifications to index
- * @return created or existing IndexWriter
- */
- IndexWriter getIndexWriter(Class entity, boolean modificationOperation) {
- //one has to close a reader for update before a writer is accessed
- try {
- closeIndexReader();
+ /**
+ * Releases the lock obtained by calling lock()
+ * @throws AssertionFailure when unlocking without having closed IndexWriter or IndexReader.
+ * @see #lock()
+ */
+ public synchronized void unlock() {
+ try {
+ if ( this.reader != null ) {
+ throw new AssertionFailure( "Unlocking Workspace without having closed the IndexReader" );
}
- catch (IOException e) {
- throw new SearchException( "Exception while closing IndexReader", e );
+ if ( this.writer != null ) {
+ throw new AssertionFailure( "Unlocking Workspace without having closed the IndexWriter" );
}
- if ( modificationOperation ) {
- operations++; //estimate the number of modifications done on this index
- }
- if ( writer != null ) return writer;
- lock();
- try {
- DocumentBuilder documentBuilder = searchFactoryImplementor.getDocumentBuilders().get( entity );
- Analyzer analyzer = entity != null ?
- documentBuilder.getAnalyzer() :
- SIMPLE_ANALYZER; //never used
- writer = new IndexWriter( directoryProvider.getDirectory(), analyzer, false ); //has been created at init time
- if ( entity != null ) {
- writer.setSimilarity( documentBuilder.getSimilarity() );
- }
- LuceneIndexingParameters indexingParams = searchFactoryImplementor.getIndexingParameters( directoryProvider );
- indexingParams.applyToWriter( writer, isBatch );
- }
- catch (IOException e) {
- cleanUp(
- new SearchException( "Unable to open IndexWriter" + ( entity != null ? " for " + entity : "" ), e )
- );
- }
- return writer;
}
-
- /**
- * Gets an IndexReader to alter the index;
- * (operations count will be incremented)
- * @throws AssertionFailure if an IndexWriter is open
- * @param entity The entity for which the IndexWriter is needed
- * @return a new or existing IndexReader
- */
- IndexReader getIndexReader(Class entity) {
- //one cannot access a reader for update after a writer has been accessed
- if ( writer != null )
- throw new AssertionFailure( "Tries to read for update an index while a writer is accessed for " + entity );
- operations++; //estimate the number of modifications done on this index
- if ( reader != null ) return reader;
- lock();
- try {
- reader = IndexReader.open( directoryProvider.getDirectory() );
- }
- catch (IOException e) {
- cleanUp( new SearchException( "Unable to open IndexReader for " + entity, e ) );
- }
- return reader;
+ finally {
+ lock.unlock();
}
+ }
- /**
- * Unlocks underlying DirectoryProvider iff locked.
- */
- void unLock() {
- if ( locked ) {
- lock.unlock();
- locked = false;
- }
- }
-
- /**
- * Locks underlying DirectoryProvider iff not locked already.
- */
- void lock() {
- if ( !locked ) {
- lock.lock();
- locked = true;
- }
- }
-
- /**
- * Closes the IndexReader, if open.
- * @throws IOException
- */
- void closeIndexReader() throws IOException {
- IndexReader toClose = reader;
- reader = null;
- if ( toClose!=null ) {
- toClose.close();
- }
- }
-
- /**
- * Closes the IndexWriter, if open.
- * @throws IOException
- */
- void closeIndexWriter() throws IOException {
- IndexWriter toClose = writer;
- writer = null;
- if ( toClose!=null ) {
- toClose.close();
- }
- }
-
+ private final void assertOwnLock() {
+ if ( ! lock.isHeldByCurrentThread() )
+ throw new AssertionFailure( "Not owning DirectoryProvider Lock" );
}
-
+
}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,20 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.store.IndexShardingStrategy;
+
+/**
+ * @author Sanne Grinovero
+ */
+interface DpSelectionDelegate {
+
+ /**
+ * The LuceneWork must be applied to different indexes.
+ * @param work the work to split.
+ * @param queues the target queue to add work to.
+ * @param shardingStrategy the Sharding strategy is usually needed to indetify affected Directories.
+ */
+ void addAsPayLoadsToQueue(LuceneWork work,
+ IndexShardingStrategy shardingStrategy, QueueProcessors queues);
+
+}
Property changes on: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java
___________________________________________________________________
Name: svn:mergeinfo
+
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,102 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import org.hibernate.search.backend.AddLuceneWork;
+import org.hibernate.search.backend.DeleteLuceneWork;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.OptimizeLuceneWork;
+import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.backend.WorkVisitor;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.store.IndexShardingStrategy;
+
+/**
+ * This is the main client for IndexShardingStrategies.
+ * Only implementation of WorkVisitor<DpSelectionDelegate>,
+ * using a visitor/selector pattern for different implementations of addAsPayLoadsToQueue
+ * depending on the type of LuceneWork.
+ *
+ * @author Sanne Grinovero
+ */
+class DpSelectionVisitor implements WorkVisitor<DpSelectionDelegate> {
+
+ private final AddSelectionDelegate addDelegate = new AddSelectionDelegate();
+ private final DeleteSelectionDelegate deleteDelegate = new DeleteSelectionDelegate();
+ private final OptimizeSelectionDelegate optimizeDelegate = new OptimizeSelectionDelegate();
+ private final PurgeAllSelectionDelegate purgeDelegate = new PurgeAllSelectionDelegate();
+
+ public DpSelectionDelegate getDelegate(AddLuceneWork addLuceneWork) {
+ return addDelegate;
+ }
+
+ public DpSelectionDelegate getDelegate(DeleteLuceneWork deleteLuceneWork) {
+ return deleteDelegate;
+ }
+
+ public DpSelectionDelegate getDelegate(OptimizeLuceneWork optimizeLuceneWork) {
+ return optimizeDelegate;
+ }
+
+ public DpSelectionDelegate getDelegate(PurgeAllLuceneWork purgeAllLuceneWork) {
+ return purgeDelegate;
+ }
+
+ private static class AddSelectionDelegate implements DpSelectionDelegate {
+
+ public void addAsPayLoadsToQueue(LuceneWork work,
+ IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
+ DirectoryProvider provider = shardingStrategy.getDirectoryProviderForAddition(
+ work.getEntityClass(),
+ work.getId(),
+ work.getIdInString(),
+ work.getDocument()
+ );
+ queues.addWorkToDpProcessor( provider, work );
+ }
+
+ }
+
+ private static class DeleteSelectionDelegate implements DpSelectionDelegate {
+
+ public void addAsPayLoadsToQueue(LuceneWork work,
+ IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
+ DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
+ work.getEntityClass(),
+ work.getId(),
+ work.getIdInString()
+ );
+ for (DirectoryProvider provider : providers) {
+ queues.addWorkToDpProcessor( provider, work );
+ }
+ }
+
+ }
+
+ private static class OptimizeSelectionDelegate implements DpSelectionDelegate {
+
+ public void addAsPayLoadsToQueue(LuceneWork work,
+ IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
+ DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForAllShards();
+ for (DirectoryProvider provider : providers) {
+ queues.addWorkToDpProcessor( provider, work );
+ }
+ }
+
+ }
+
+ private static class PurgeAllSelectionDelegate implements DpSelectionDelegate {
+
+ public void addAsPayLoadsToQueue(LuceneWork work,
+ IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
+ DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
+ work.getEntityClass(),
+ work.getId(),
+ work.getIdInString()
+ );
+ for (DirectoryProvider provider : providers) {
+ queues.addWorkToDpProcessor( provider, work );
+ }
+ }
+
+ }
+
+}
Property changes on: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java
___________________________________________________________________
Name: svn:mergeinfo
+
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/IndexInteractionType.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/IndexInteractionType.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/IndexInteractionType.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,27 @@
+package org.hibernate.search.backend.impl.lucene;
+
+/**
+ * @author Sanne Grinovero
+ */
+public enum IndexInteractionType {
+
+ /**
+ * means the workType needs an IndexWriter.
+ */
+ NEEDS_INDEXWRITER,
+ /**
+ * means the workType needs an IndexReader.
+ */
+ NEEDS_INDEXREADER,
+ /**
+ * means an IndexWriter should work best but it's possible
+ * to use an IndexReader instead.
+ */
+ PREFER_INDEXWRITER,
+ /**
+ * means an IndexReader should work best but it's possible
+ * to use an IndexWriter instead.
+ */
+ PREFER_INDEXREADER
+
+}
Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java 2008-10-10 01:26:16 UTC (rev 15315)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -1,146 +1,52 @@
//$Id$
package org.hibernate.search.backend.impl.lucene;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
-import java.util.ArrayList;
+import java.util.Map;
-import org.hibernate.search.backend.AddLuceneWork;
import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.OptimizeLuceneWork;
-import org.hibernate.search.backend.DeleteLuceneWork;
-import org.hibernate.search.backend.PurgeAllLuceneWork;
-import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.store.DirectoryProvider;
import org.hibernate.search.store.IndexShardingStrategy;
-import org.hibernate.annotations.common.AssertionFailure;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Apply the operations to Lucene directories avoiding deadlocks.
+ * Apply the operations to Lucene directories.
*
* @author Emmanuel Bernard
* @author Hardy Ferentschik
* @author John Griffin
+ * @author Sanne Grinovero
*/
-public class LuceneBackendQueueProcessor implements Runnable {
+class LuceneBackendQueueProcessor implements Runnable {
- /**
- * Class logger.
- */
- private static final Logger log = LoggerFactory.getLogger( LuceneBackendQueueProcessor.class );
-
private final List<LuceneWork> queue;
private final SearchFactoryImplementor searchFactoryImplementor;
+ private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap;
+ private static final DpSelectionVisitor providerSelectionVisitor = new DpSelectionVisitor();
- public LuceneBackendQueueProcessor(List<LuceneWork> queue, SearchFactoryImplementor searchFactoryImplementor) {
+ LuceneBackendQueueProcessor(List<LuceneWork> queue,
+ SearchFactoryImplementor searchFactoryImplementor,
+ Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap) {
this.queue = queue;
this.searchFactoryImplementor = searchFactoryImplementor;
+ this.visitorsMap = visitorsMap;
}
public void run() {
- Workspace workspace;
- LuceneWorker worker;
- workspace = new Workspace( searchFactoryImplementor );
- worker = new LuceneWorker( workspace );
- try {
- List<LuceneWorker.WorkWithPayload> queueWithFlatDPs = new ArrayList<LuceneWorker.WorkWithPayload>( queue.size()*2 );
- for ( LuceneWork work : queue ) {
- DocumentBuilder documentBuilder = searchFactoryImplementor.getDocumentBuilders().get( work.getEntityClass() );
- IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
-
- if ( PurgeAllLuceneWork.class.isAssignableFrom( work.getClass() ) ) {
- DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
- work.getEntityClass(),
- work.getId(),
- work.getIdInString()
- );
- for (DirectoryProvider provider : providers) {
- queueWithFlatDPs.add( new LuceneWorker.WorkWithPayload( work, provider ) );
- }
- }
- else if ( AddLuceneWork.class.isAssignableFrom( work.getClass() ) ) {
- DirectoryProvider provider = shardingStrategy.getDirectoryProviderForAddition(
- work.getEntityClass(),
- work.getId(),
- work.getIdInString(),
- work.getDocument()
- );
- queueWithFlatDPs.add( new LuceneWorker.WorkWithPayload( work, provider ) );
- }
- else if ( DeleteLuceneWork.class.isAssignableFrom( work.getClass() ) ) {
- DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
- work.getEntityClass(),
- work.getId(),
- work.getIdInString()
- );
- for (DirectoryProvider provider : providers) {
- queueWithFlatDPs.add( new LuceneWorker.WorkWithPayload( work, provider ) );
- }
- }
- else if ( OptimizeLuceneWork.class.isAssignableFrom( work.getClass() ) ) {
- DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForAllShards();
- for (DirectoryProvider provider : providers) {
- queueWithFlatDPs.add( new LuceneWorker.WorkWithPayload( work, provider ) );
- }
- }
- else {
- throw new AssertionFailure( "Unknown work type: " + work.getClass() );
- }
- }
- deadlockFreeQueue( queueWithFlatDPs, searchFactoryImplementor );
- checkForBatchIndexing(workspace);
- for ( LuceneWorker.WorkWithPayload luceneWork : queueWithFlatDPs ) {
- worker.performWork( luceneWork );
- }
+ QueueProcessors processors = new QueueProcessors( visitorsMap );
+ // divide tasks in parts, adding to QueueProcessors by affected Directory.
+ for ( LuceneWork work : queue ) {
+ DocumentBuilder documentBuilder = searchFactoryImplementor.getDocumentBuilders().get( work.getEntityClass() );
+ IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
+ work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work, shardingStrategy, processors );
}
- finally {
- workspace.clean();
- queue.clear();
+ // TODO next cycle could be performed in parallel
+ for ( PerDPQueueProcessor processor : processors.getQueueProcessors() ) {
+ // perform the work on indexes
+ processor.performWorks();
}
}
- private void checkForBatchIndexing(Workspace workspace) {
- for ( LuceneWork luceneWork : queue ) {
- // if there is at least a single batch index job we put the work space into batch indexing mode.
- if( luceneWork.isBatch() ){
- log.trace( "Setting batch indexing mode." );
- workspace.setBatch( true );
- break;
- }
- }
- }
-
- /**
- * one must lock the directory providers in the exact same order to avoid
- * dead lock between concurrent threads or processes
- * To achieve that, the work will be done per directory provider
- */
- private void deadlockFreeQueue(List<LuceneWorker.WorkWithPayload> queue, final SearchFactoryImplementor searchFactoryImplementor) {
- Collections.sort( queue, new Comparator<LuceneWorker.WorkWithPayload>() {
- public int compare(LuceneWorker.WorkWithPayload o1, LuceneWorker.WorkWithPayload o2) {
- long h1 = getWorkHashCode( o1, searchFactoryImplementor );
- long h2 = getWorkHashCode( o2, searchFactoryImplementor );
- return h1 < h2 ?
- -1 :
- h1 == h2 ?
- 0 :
- 1;
- }
- } );
- }
-
- private long getWorkHashCode(LuceneWorker.WorkWithPayload luceneWork, SearchFactoryImplementor searchFactoryImplementor) {
- DirectoryProvider provider = luceneWork.getProvider();
- int h = provider.getClass().hashCode();
- h = 31 * h + provider.hashCode();
- long extendedHash = h; //to be sure extendedHash + 1 < extendedHash + 2 is always true
- if ( luceneWork.getWork() instanceof AddLuceneWork ) extendedHash+=1; //addwork after deleteWork
- if ( luceneWork.getWork() instanceof OptimizeLuceneWork ) extendedHash+=2; //optimize after everything
- return extendedHash;
- }
}
Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java 2008-10-10 01:26:16 UTC (rev 15315)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -1,24 +1,52 @@
//$Id$
package org.hibernate.search.backend.impl.lucene;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.List;
import org.hibernate.search.backend.BackendQueueProcessorFactory;
import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.store.DirectoryProvider;
/**
+ * This will actually contain the Workspace and LuceneWork visitor implementation,
+ * reused per-DirectoryProvider.
+ * Both Workspace(s) and LuceneWorkVisitor(s) lifecycle are linked to the backend
+ * lifecycle (reused and shared by all transactions).
+ * The LuceneWorkVisitor(s) are stateless, the Workspace(s) are threadsafe.
+ *
* @author Emmanuel Bernard
+ * @author Sanne Grinovero
*/
public class LuceneBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
- private SearchFactoryImplementor searchFactoryImplementor;
+ private SearchFactoryImplementor searchFactoryImp;
+
+ /**
+ * Contains the Workspace and LuceneWork visitor implementation,
+ * reused per-DirectoryProvider.
+ * Both Workspace(s) and LuceneWorkVisitor(s) lifecycle are linked to the backend
+ * lifecycle (reused and shared by all transactions);
+ * the LuceneWorkVisitor(s) are stateless, the Workspace(s) are threadsafe.
+ */
+ private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap = new HashMap<DirectoryProvider,LuceneWorkVisitor>();
+
public void initialize(Properties props, SearchFactoryImplementor searchFactoryImplementor) {
- this.searchFactoryImplementor = searchFactoryImplementor;
+ this.searchFactoryImp = searchFactoryImplementor;
+ for (DirectoryProvider dp : searchFactoryImplementor.getDirectoryProviders() ) {
+ Workspace w = new Workspace( searchFactoryImplementor, dp );
+ LuceneWorkVisitor visitor = new LuceneWorkVisitor( w );
+ visitorsMap.put( dp, visitor );
+ }
}
public Runnable getProcessor(List<LuceneWork> queue) {
- return new LuceneBackendQueueProcessor( queue, searchFactoryImplementor );
+ return new LuceneBackendQueueProcessor( queue, searchFactoryImp, visitorsMap );
}
+
}
Deleted: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java 2008-10-10 01:26:16 UTC (rev 15315)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -1,164 +0,0 @@
-//$Id$
-package org.hibernate.search.backend.impl.lucene;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.hibernate.annotations.common.AssertionFailure;
-import org.hibernate.search.SearchException;
-import org.hibernate.search.backend.AddLuceneWork;
-import org.hibernate.search.backend.DeleteLuceneWork;
-import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.OptimizeLuceneWork;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.PurgeAllLuceneWork;
-import org.hibernate.search.engine.DocumentBuilder;
-import org.hibernate.search.store.DirectoryProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Stateless implementation that performs a unit of work.
- *
- * @author Emmanuel Bernard
- * @author Hardy Ferentschik
- * @author John Griffin
- */
-public class LuceneWorker {
- private final Workspace workspace;
- private static final Logger log = LoggerFactory.getLogger( LuceneWorker.class );
-
- public LuceneWorker(Workspace workspace) {
- this.workspace = workspace;
- }
-
- public void performWork(WorkWithPayload luceneWork) {
- Class workClass = luceneWork.getWork().getClass();
- if ( AddLuceneWork.class.isAssignableFrom( workClass ) ) {
- performWork( (AddLuceneWork) luceneWork.getWork(), luceneWork.getProvider() );
- }
- else if ( DeleteLuceneWork.class.isAssignableFrom( workClass ) ) {
- performWork( (DeleteLuceneWork) luceneWork.getWork(), luceneWork.getProvider() );
- }
- else if ( OptimizeLuceneWork.class.isAssignableFrom( workClass ) ) {
- performWork( (OptimizeLuceneWork) luceneWork.getWork(), luceneWork.getProvider() );
- }else if ( PurgeAllLuceneWork.class.isAssignableFrom( workClass ) ) {
- performWork( (PurgeAllLuceneWork) luceneWork.getWork(), luceneWork.getProvider() );
- }
- else {
- throw new AssertionFailure( "Unknown work type: " + workClass );
- }
- }
-
- public void performWork(AddLuceneWork work, DirectoryProvider provider) {
- Class entity = work.getEntityClass();
- Serializable id = work.getId();
- Document document = work.getDocument();
- add( entity, id, document, provider );
- }
-
- private void add(Class entity, Serializable id, Document document, DirectoryProvider provider) {
- log.trace( "add to Lucene index: {}#{}:{}", new Object[] { entity, id, document } );
- IndexWriter writer = workspace.getIndexWriter( provider, entity, true );
- try {
- writer.addDocument( document );
- }
- catch (IOException e) {
- throw new SearchException( "Unable to add to Lucene index: " + entity + "#" + id, e );
- }
- }
-
- public void performWork(DeleteLuceneWork work, DirectoryProvider provider) {
- Class entity = work.getEntityClass();
- Serializable id = work.getId();
- remove( entity, id, provider );
- }
-
- private void remove(Class entity, Serializable id, DirectoryProvider provider) {
- /**
- * even with Lucene 2.1, use of indexWriter to delete is not an option
- * We can only delete by term, and the index doesn't have a termt that
- * uniquely identify the entry. See logic below
- */
- log.trace( "remove from Lucene index: {}#{}", entity, id );
- DocumentBuilder builder = workspace.getDocumentBuilder( entity );
- Term term = builder.getTerm( id );
- IndexReader reader = workspace.getIndexReader( provider, entity );
- TermDocs termDocs = null;
- try {
- //TODO is there a faster way?
- //TODO include TermDocs into the workspace?
- termDocs = reader.termDocs( term );
- String entityName = entity.getName();
- while ( termDocs.next() ) {
- int docIndex = termDocs.doc();
- if ( entityName.equals( reader.document( docIndex ).get( DocumentBuilder.CLASS_FIELDNAME ) ) ) {
- //remove only the one of the right class
- //loop all to remove all the matches (defensive code)
- reader.deleteDocument( docIndex );
- }
- }
- }
- catch (Exception e) {
- throw new SearchException( "Unable to remove from Lucene index: " + entity + "#" + id, e );
- }
- finally {
- if ( termDocs != null ) try {
- termDocs.close();
- }
- catch (IOException e) {
- log.warn( "Unable to close termDocs properly", e );
- }
- }
-
- }
-
- public void performWork(OptimizeLuceneWork work, DirectoryProvider provider) {
- Class entity = work.getEntityClass();
- log.trace( "optimize Lucene index: {}", entity );
- IndexWriter writer = workspace.getIndexWriter( provider, entity, false );
- try {
- writer.optimize();
- workspace.optimize( provider );
- }
- catch (IOException e) {
- throw new SearchException( "Unable to optimize Lucene index: " + entity, e );
- }
- }
-
- public void performWork(PurgeAllLuceneWork work, DirectoryProvider provider) {
- Class entity = work.getEntityClass();
- log.trace( "purgeAll Lucene index: {}", entity );
- IndexReader reader = workspace.getIndexReader( provider, entity );
- try {
- Term term = new Term( DocumentBuilder.CLASS_FIELDNAME, entity.getName() );
- reader.deleteDocuments( term );
- }
- catch (Exception e) {
- throw new SearchException( "Unable to purge all from Lucene index: " + entity, e );
- }
- }
-
- public static class WorkWithPayload {
- private final LuceneWork work;
- private final DirectoryProvider provider;
-
- public WorkWithPayload(LuceneWork work, DirectoryProvider provider) {
- this.work = work;
- this.provider = provider;
- }
-
- public LuceneWork getWork() {
- return work;
- }
-
- public DirectoryProvider getProvider() {
- return provider;
- }
- }
-}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,158 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.hibernate.annotations.common.AssertionFailure;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author Sanne Grinovero
+ */
+class PerDPQueueProcessor {
+
+ private final Logger log = LoggerFactory.getLogger( PerDPQueueProcessor.class );
+ private final Workspace workspace;
+ private final LuceneWorkVisitor worker;
+ private final List<LuceneWork> workOnWriter = new ArrayList<LuceneWork>();
+ private final List<LuceneWork> workOnReader= new ArrayList<LuceneWork>();
+
+ // if any work passed to addWork needs one, set corresponding flag to true:
+ private boolean batchmode = false;
+ private boolean needsReader = false;
+ private boolean needsWriter = false;
+ private boolean preferReader = false;
+
+ public PerDPQueueProcessor(LuceneWorkVisitor worker) {
+ this.worker = worker;
+ this.workspace = worker.getWorkspace();
+ }
+
+ public void addWork(LuceneWork work) {
+ if ( work.isBatch() ) {
+ batchmode = true;
+ log.debug( "Batch mode enabled" );
+ }
+ IndexInteractionType type = work.getWorkDelegate( worker ).getIndexInteractionType();
+ switch ( type ) {
+ case NEEDS_INDEXREADER :
+ needsReader = true;
+ //fall through:
+ case PREFER_INDEXREADER :
+ preferReader = true;
+ workOnReader.add( work );
+ break;
+ case NEEDS_INDEXWRITER :
+ needsWriter = true;
+ //fall through:
+ case PREFER_INDEXWRITER :
+ workOnWriter.add( work );
+ break;
+ default :
+ throw new AssertionFailure( "Uncovered switch case for type " + type );
+ }
+ }
+
+ public void performWorks() {
+ // skip "resource optimization mode" when in batch to have all tasks use preferred (optimal) mode.
+ if ( ! batchmode ) {
+ // see if we can skip using some resource
+ if ( ! needsReader && ! needsWriter ) { // no specific need:
+ if ( preferReader ) {
+ useReaderOnly();
+ }
+ else {
+ useWriterOnly();
+ }
+ }
+ else if ( needsReader && !needsWriter ) {
+ useReaderOnly();
+ }
+ else if ( !needsReader && needsWriter ) {
+ useWriterOnly();
+ }
+ }
+ // apply changes to index:
+ log.trace( "Locking Workspace (or waiting to...)" );
+ workspace.lock();
+ log.trace( "Workspace lock aquired." );
+ try {
+ performReaderWorks();
+ performWriterWorks();
+ }
+ finally {
+ workspace.unlock();
+ log.trace( "Unlocking Workspace" );
+ }
+ }
+
+ /**
+ * Do all workOnWriter on an IndexWriter.
+ */
+ private void performWriterWorks() {
+ if ( workOnWriter.isEmpty() ) {
+ return;
+ }
+ log.debug( "Opening an IndexWriter for update" );
+ IndexWriter indexWriter = workspace.getIndexWriter( batchmode );
+ try {
+ for (LuceneWork lw : workOnWriter) {
+ lw.getWorkDelegate( worker ).performWork( lw, indexWriter );
+ }
+ //TODO next line is assuming the OptimizerStrategy will need an IndexWriter;
+ // would be nicer to have the strategy put an OptimizeWork on the queue,
+ // or just return "yes please" (true) to some method?
+ //FIXME will not have a chance to trigger when no writer activity is done.
+ // this is currently ok, until we enable mod.counts for deletions too.
+ workspace.optimizerPhase();
+ }
+ finally {
+ workspace.closeIndexWriter();
+ }
+
+ }
+
+ /**
+ * Do all workOnReader on an IndexReader.
+ */
+ private void performReaderWorks() {
+ if ( workOnReader.isEmpty() ) {
+ return;
+ }
+ log.debug( "Opening an IndexReader for update" );
+ IndexReader indexReader = workspace.getIndexReader();
+ try {
+ for (LuceneWork lw : workOnReader) {
+ lw.getWorkDelegate( worker ).performWork( lw, indexReader );
+ }
+ }
+ finally {
+ workspace.closeIndexReader();
+ }
+ }
+
+ /**
+ * forces all work to be done using only an IndexReader
+ */
+ private void useReaderOnly() {
+ log.debug( "Skipping usage of an IndexWriter for updates" );
+ workOnReader.addAll( workOnWriter );
+ workOnWriter.clear();
+ }
+
+ /**
+ * forces all work to be done using only an IndexWriter
+ */
+ private void useWriterOnly() {
+ log.debug( "Skipping usage of an IndexReader for updates" );
+ workOnWriter.addAll( workOnReader );
+ workOnReader.clear();
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,36 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.store.DirectoryProvider;
+
+/**
+ * Container used to split work by DirectoryProviders.
+ * @author Sanne Grinovero
+ */
+class QueueProcessors {
+
+ private final Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap;
+ private final Map<DirectoryProvider, PerDPQueueProcessor> dpProcessors = new HashMap<DirectoryProvider, PerDPQueueProcessor>();
+
+ QueueProcessors(Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap) {
+ this.visitorsMap = visitorsMap;
+ }
+
+ void addWorkToDpProcessor(DirectoryProvider dp, LuceneWork work) {
+ if ( ! dpProcessors.containsKey( dp ) ) {
+ dpProcessors.put( dp, new PerDPQueueProcessor( visitorsMap.get( dp ) ) );
+ }
+ PerDPQueueProcessor processor = dpProcessors.get( dp );
+ processor.addWork ( work );
+ }
+
+ Collection<PerDPQueueProcessor> getQueueProcessors(){
+ return dpProcessors.values();
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,64 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import java.io.IOException;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Similarity;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.IndexInteractionType;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* Stateless implementation that performs a AddLuceneWork.
+* @see LuceneWorkVisitor
+* @see LuceneWorkDelegate
+* @author Emmanuel Bernard
+* @author Hardy Ferentschik
+* @author John Griffin
+* @author Sanne Grinovero
+*/
+class AddWorkDelegate implements LuceneWorkDelegate {
+
+ private final Workspace workspace;
+ private final Logger log = LoggerFactory.getLogger( AddWorkDelegate.class );
+
+ AddWorkDelegate(Workspace workspace) {
+ this.workspace = workspace;
+ }
+
+ public IndexInteractionType getIndexInteractionType() {
+ return IndexInteractionType.NEEDS_INDEXWRITER;
+ }
+
+ public void performWork(LuceneWork work, IndexWriter writer) {
+ DocumentBuilder documentBuilder = workspace.getDocumentBuilder( work.getEntityClass() );
+ Analyzer analyzer = documentBuilder.getAnalyzer();
+ Similarity similarity = documentBuilder.getSimilarity();
+ if ( log.isTraceEnabled() ) {
+ log.trace( "add to Lucene index: {}#{}:{}",
+ new Object[] { work.getEntityClass(), work.getId(), work.getDocument() } );
+ }
+ try {
+ //TODO the next two operations should be atomic to enable concurrent usage of IndexWriter
+ // make a wrapping Similarity based on ThreadLocals? or having it autoselect implementation basing on entity?
+ writer.setSimilarity( similarity );
+ writer.addDocument( work.getDocument() , analyzer );
+ workspace.incrementModificationCounter( 1 );
+ }
+ catch (IOException e) {
+ throw new SearchException( "Unable to add to Lucene index: "
+ + work.getEntityClass() + "#" + work.getId(), e );
+ }
+ }
+
+ public void performWork(LuceneWork work, IndexReader reader) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,82 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.IndexInteractionType;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* Stateless implementation that performs a DeleteLuceneWork.
+* @see LuceneWorkVisitor
+* @see LuceneWorkDelegate
+* @author Emmanuel Bernard
+* @author Hardy Ferentschik
+* @author John Griffin
+* @author Sanne Grinovero
+*/
+class DeleteWorkDelegate implements LuceneWorkDelegate {
+
+ private final Workspace workspace;
+ private final Logger log = LoggerFactory.getLogger( AddWorkDelegate.class );
+
+ DeleteWorkDelegate(Workspace workspace) {
+ this.workspace = workspace;
+ }
+
+ public IndexInteractionType getIndexInteractionType() {
+ return IndexInteractionType.NEEDS_INDEXREADER;
+ }
+
+ public void performWork(LuceneWork work, IndexWriter writer) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void performWork(LuceneWork work, IndexReader reader) {
+ /**
+ * even with Lucene 2.1, use of indexWriter to delete is not an option
+ * We can only delete by term, and the index doesn't have a term that
+ * uniquely identify the entry. See logic below
+ */
+ log.trace( "remove from Lucene index: {}#{}", work.getEntityClass(), work.getId() );
+ DocumentBuilder builder = workspace.getDocumentBuilder( work.getEntityClass() );
+ Term term = builder.getTerm( work.getId() );
+ TermDocs termDocs = null;
+ try {
+ //TODO is there a faster way?
+ //TODO include TermDocs into the workspace?
+ termDocs = reader.termDocs( term );
+ String entityName = work.getEntityClass().getName();
+ while ( termDocs.next() ) {
+ int docIndex = termDocs.doc();
+ if ( entityName.equals( reader.document( docIndex ).get( DocumentBuilder.CLASS_FIELDNAME ) ) ) {
+ //remove only the one of the right class
+ //loop all to remove all the matches (defensive code)
+ reader.deleteDocument( docIndex );
+ }
+ }
+ //TODO shouldn't this use workspace.incrementModificationCounter( 1 ) ?
+ }
+ catch (Exception e) {
+ throw new SearchException( "Unable to remove from Lucene index: "
+ + work.getEntityClass() + "#" + work.getId(), e );
+ }
+ finally {
+ if ( termDocs != null ) try {
+ termDocs.close();
+ }
+ catch (IOException e) {
+ log.warn( "Unable to close termDocs properly", e );
+ }
+ }
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,35 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.impl.lucene.IndexInteractionType;
+
+/**
+ * @author Sanne Grinovero
+ */
+public interface LuceneWorkDelegate {
+
+ /**
+ * @return the IndexInteractionType needed to accomplish this work (reader or writer)
+ * or have a chance to express any preference for performance optimizations.
+ */
+ IndexInteractionType getIndexInteractionType();
+
+ /**
+ * Will perform work on an IndexWriter.
+ * @param work the LuceneWork to apply to the IndexWriter.
+ * @param writer the IndexWriter to use.
+ * @throws UnsupportedOperationException when the work is not compatible with an IndexWriter.
+ */
+ void performWork(LuceneWork work, IndexWriter writer);
+
+ /**
+ * Will perform this work on an IndexReader.
+ * @param work the LuceneWork to apply to the IndexReader.
+ * @param reader the IndexReader to use.
+ * @throws UnsupportedOperationException when the work is not compatible with an IndexReader.
+ */
+ void performWork(LuceneWork work, IndexReader reader);
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,55 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import org.hibernate.search.backend.AddLuceneWork;
+import org.hibernate.search.backend.DeleteLuceneWork;
+import org.hibernate.search.backend.OptimizeLuceneWork;
+import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.backend.WorkVisitor;
+import org.hibernate.search.backend.Workspace;
+
+/**
+ * @author Sanne Grinovero
+ */
+public class LuceneWorkVisitor implements WorkVisitor<LuceneWorkDelegate> {
+
+ private final AddWorkDelegate addDelegate;
+ private final DeleteWorkDelegate deleteDelegate;
+ private final OptimizeWorkDelegate optimizeDelegate;
+ private final PurgeAllWorkDelegate purgeAllDelegate;
+
+ /**
+ * The Workspace this visitor has been created for;
+ * different workspaces could use different Delegates for specific
+ * needs basing on workspace or DirectoryProvider configuration.
+ */
+ private final Workspace linkedWorkspace;
+
+ public LuceneWorkVisitor(Workspace workspace) {
+ this.addDelegate = new AddWorkDelegate( workspace );
+ this.deleteDelegate = new DeleteWorkDelegate( workspace );
+ this.optimizeDelegate = new OptimizeWorkDelegate( workspace );
+ this.purgeAllDelegate = new PurgeAllWorkDelegate();
+ this.linkedWorkspace = workspace;
+ }
+
+ public LuceneWorkDelegate getDelegate(AddLuceneWork addLuceneWork) {
+ return addDelegate;
+ }
+
+ public LuceneWorkDelegate getDelegate(DeleteLuceneWork deleteLuceneWork) {
+ return deleteDelegate;
+ }
+
+ public LuceneWorkDelegate getDelegate(OptimizeLuceneWork optimizeLuceneWork) {
+ return optimizeDelegate;
+ }
+
+ public LuceneWorkDelegate getDelegate(PurgeAllLuceneWork purgeAllLuceneWork) {
+ return purgeAllDelegate;
+ }
+
+ public Workspace getWorkspace(){
+ return linkedWorkspace;
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,51 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.IndexInteractionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* Stateless implementation that performs a OptimizeLuceneWork.
+* @see LuceneWorkVisitor
+* @see LuceneWorkDelegate
+* @author Emmanuel Bernard
+* @author Hardy Ferentschik
+* @author John Griffin
+* @author Sanne Grinovero
+*/
+class OptimizeWorkDelegate implements LuceneWorkDelegate {
+
+ private final Workspace workspace;
+ private final Logger log = LoggerFactory.getLogger( AddWorkDelegate.class );
+
+ OptimizeWorkDelegate(Workspace workspace) {
+ this.workspace = workspace;
+ }
+
+ public IndexInteractionType getIndexInteractionType() {
+ return IndexInteractionType.NEEDS_INDEXWRITER;
+ }
+
+ public void performWork(LuceneWork work, IndexWriter writer) {
+ log.trace( "optimize Lucene index: {}", work.getEntityClass() );
+ try {
+ writer.optimize();
+ workspace.optimize();
+ }
+ catch (IOException e) {
+ throw new SearchException( "Unable to optimize Lucene index: " + work.getEntityClass(), e );
+ }
+ }
+
+ public void performWork(LuceneWork work, IndexReader reader) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -0,0 +1,48 @@
+package org.hibernate.search.backend.impl.lucene.works;
+
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.impl.lucene.IndexInteractionType;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+* Stateless implementation that performs a PurgeAllLuceneWork.
+* @see LuceneWorkVisitor
+* @see LuceneWorkDelegate
+* @author Emmanuel Bernard
+* @author Hardy Ferentschik
+* @author John Griffin
+* @author Sanne Grinovero
+*/
+class PurgeAllWorkDelegate implements LuceneWorkDelegate {
+
+ private final Logger log = LoggerFactory.getLogger( AddWorkDelegate.class );
+
+ PurgeAllWorkDelegate() {
+ }
+
+ public IndexInteractionType getIndexInteractionType() {
+ return IndexInteractionType.NEEDS_INDEXREADER;
+ }
+
+ public void performWork(LuceneWork work, IndexWriter writer) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void performWork(LuceneWork work, IndexReader reader) {
+ log.trace( "purgeAll Lucene index using IndexReader: {}", work.getEntityClass() );
+ try {
+ Term term = new Term( DocumentBuilder.CLASS_FIELDNAME, work.getEntityClass().getName() );
+ reader.deleteDocuments( term );
+ }
+ catch (Exception e) {
+ throw new SearchException( "Unable to purge all from Lucene index: " + work.getEntityClass(), e );
+ }
+ }
+
+}
Modified: search/trunk/src/java/org/hibernate/search/store/optimization/IncrementalOptimizerStrategy.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/optimization/IncrementalOptimizerStrategy.java 2008-10-10 01:26:16 UTC (rev 15315)
+++ search/trunk/src/java/org/hibernate/search/store/optimization/IncrementalOptimizerStrategy.java 2008-10-10 08:47:53 UTC (rev 15316)
@@ -51,7 +51,7 @@
if ( needOptimization() ) {
log.debug( "Optimize {} after {} operations and {} transactions",
new Object[] { directoryProvider.getDirectory(), operations, transactions });
- IndexWriter writer = workspace.getIndexWriter( directoryProvider );
+ IndexWriter writer = workspace.getIndexWriter( false ); //TODO true or false?
try {
writer.optimize();
}
More information about the hibernate-commits
mailing list