[hibernate-commits] Hibernate SVN: r16967 - in search/trunk/src: main/java/org/hibernate/search/backend and 10 other directories.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Mon Jun 29 18:25:44 EDT 2009


Author: sannegrinovero
Date: 2009-06-29 18:25:43 -0400 (Mon, 29 Jun 2009)
New Revision: 16967

Added:
   search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/
   search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/BatchBackend.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/DirectoryProviderWorkspace.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/LuceneBatchBackend.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDirectoryWorkProcessor.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchCoordinator.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/IndexingGeneratedCorpusTest.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/TitleAble.java
   search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/
   search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/SentenceInventor.java
   search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/TextProductionTest.java
   search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/WordDictionary.java
Removed:
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
Modified:
   search/trunk/src/main/java/org/hibernate/search/Environment.java
   search/trunk/src/main/java/org/hibernate/search/Indexer.java
   search/trunk/src/main/java/org/hibernate/search/backend/Workspace.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java
   search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
   search/trunk/src/main/java/org/hibernate/search/engine/SearchFactoryImplementor.java
   search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
   search/trunk/src/main/java/org/hibernate/search/impl/SearchFactoryImpl.java
   search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
Log:
HSEARCH-218 [add indexAll( Class type ) to rebuild indexes from all data]
Adding batch backend and coupling the batch API to the new backend.

Modified: search/trunk/src/main/java/org/hibernate/search/Environment.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/Environment.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/Environment.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -64,12 +64,19 @@
 	public static final String READER_STRATEGY = READER_PREFIX + "strategy";
 	
 	/**
-	 * filter caching strategy class (must have a no-arg constructor and implements FilterCachingStrategy)
+	 * filter caching strategy class (must have a no-arg constructor and implement FilterCachingStrategy)
 	 */
 	public static final String FILTER_CACHING_STRATEGY = "hibernate.search.filter.cache_strategy";
 	
 	/**
 	 * number of docidresults cached in hard reference.
 	 */
-	public static final String CACHE_DOCIDRESULTS_SIZE = "hibernate.search.filter.cache_docidresults.size";	
+	public static final String CACHE_DOCIDRESULTS_SIZE = "hibernate.search.filter.cache_docidresults.size";
+	
+	/**
+	 * batch backend implementation class (must have a no-arg constructor and implement BatchBackend)
+	 * also prefix for configuration settings of the batch backend
+	 */
+	public static final String BATCH_BACKEND = "hibernate.search.batchbackend";
+	
 }

Modified: search/trunk/src/main/java/org/hibernate/search/Indexer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/Indexer.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/Indexer.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,6 +1,6 @@
 package org.hibernate.search;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 
 import org.hibernate.CacheMode;
 
@@ -29,7 +29,13 @@
 	 */
 	Indexer documentBuilderThreads(int numberOfThreads);
 	
-	//not supported yet
+	/**
+	 * Sets the number of threads to be used to analyze the documents
+	 * and write to the index.
+	 * @param numberOfThreads
+	 * @return
+	 */
+	//TODO not yet implemented
 	//Indexer indexWriterThreads(int numberOfThreads);
 	
 	/**
@@ -68,21 +74,28 @@
 	Indexer purgeAllAtStart(boolean purgeAll);
 	
 	/**
+	 * Will stop indexing after having indexed this amount of objects.
+	 * As a results the index will not be consistent
+	 * with the database: use only for testing.
+	 * @param maximum
+	 * @return
+	 */
+	Indexer limitObjects(int maximum);
+
+	/**
 	 * Starts the indexing process in background (asynchronous).
 	 * Can be called only once.
+	 * @return a Future to control task canceling. get() will always return null,
+	 * blocking until completion.
 	 */
-	void start();
+	Future<?> start();
 	
 	/**
 	 * Starts the indexing process, and then block until it's finished.
 	 * Can be called only once.
-	 * @param timeout the maximum time to wait
-	 * @param unit the time unit of the <tt>timeout</tt> argument.
-	 * @return <tt>true</tt> if the process finished and <tt>false</tt>
-     * if the waiting time elapsed before the process was finished.
 	 * @throws InterruptedException if the current thread is interrupted
      * while waiting.
 	 */
-	boolean startAndWait( long timeout, TimeUnit unit ) throws InterruptedException;
+	void startAndWait() throws InterruptedException;
 
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/Workspace.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/Workspace.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/Workspace.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -11,7 +11,6 @@
 import org.apache.lucene.index.IndexWriter;
 import org.slf4j.Logger;
 
-import org.hibernate.annotations.common.AssertionFailure;
 import org.hibernate.search.SearchException;
 import org.hibernate.search.SearchFactory;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
@@ -78,16 +77,19 @@
 	/**
 	 * If optimization has not been forced give a chance to configured OptimizerStrategy
 	 * to optimize the index.
-	 * To enter the optimization phase you need to acquire the lock first.
-	 * @throws AssertionFailure if the lock is not owned.
 	 */
 	public void optimizerPhase() {
-		assertOwnLock();
-		// used getAndSet(0) because Workspace is going to be reused by next transaction.
-		synchronized (optimizerStrategy) {
-			optimizerStrategy.addTransaction( operations.getAndSet( 0L ) );
-			optimizerStrategy.optimize( this );
+		lock.lock();
+		try {
+			// used getAndSet(0) because Workspace is going to be reused by next transaction.
+			synchronized ( optimizerStrategy ) {
+				optimizerStrategy.addTransaction( operations.getAndSet( 0L ) );
+				optimizerStrategy.optimize( this );
+			}
 		}
+		finally {
+			lock.unlock();
+		}
 	}
 	
 	/**
@@ -98,17 +100,22 @@
 	 * @see SearchFactory#optimize(Class)
 	 */
 	public void optimize() {
-		// Needs to ensure the optimizerStrategy is accessed in threadsafe way
-		synchronized (optimizerStrategy) {
-			optimizerStrategy.optimizationForced();
+		lock.lock();
+		try {
+			//Needs to ensure the optimizerStrategy is accessed in threadsafe way
+			synchronized (optimizerStrategy) {
+				optimizerStrategy.optimizationForced();
+			}
 		}
+		finally {
+			lock.unlock();
+		}
 	}
 
 	/**
 	 * Gets the IndexWriter, opening one if needed.
 	 * @param batchmode when true the indexWriter settings for batch mode will be applied.
 	 * Ignored if IndexWriter is open already.
-	 * @throws AssertionFailure if the lock is not owned.
 	 * @throws SearchException on a IOException during index opening.
 	 * @return a new IndexWriter or one already open.
 	 */
@@ -130,8 +137,8 @@
 	/**
 	 * Commits changes to a previously opened IndexWriter.
 	 *
-	 * @throws SearchException on IOException during Lucene close operation.
-	 * @throws AssertionFailure if there is no IndexWriter to close.
+	 * @throws SearchException on IOException during Lucene close operation,
+	 * or if there is no IndexWriter to close.
 	 */
 	public synchronized void commitIndexWriter() {
 		if ( writer != null ) {
@@ -143,15 +150,11 @@
 				throw new SearchException( "Exception while commiting index changes", e );
 			}
 		}
-		else {
-			throw new AssertionFailure( "No open IndexWriter to commit changes." );
-		}
 	}
 
 	/**
 	 * Closes a previously opened IndexWriter.
-	 * @throws SearchException on IOException during Lucene close operation.
-	 * @throws AssertionFailure if there is no IndexWriter to close.
+	 * @throws SearchException on IOException during Lucene close operation
 	 */
 	public synchronized void closeIndexWriter() {
 		IndexWriter toClose = writer;
@@ -165,9 +168,6 @@
 				throw new SearchException( "Exception while closing IndexWriter", e );
 			}
 		}
-		else {
-			throw new AssertionFailure( "No open IndexWriter to close" );
-		}
 	}
 
 	/**
@@ -186,27 +186,5 @@
 	public Set<Class<?>> getEntitiesInDirectory() {
 		return entitiesInDirectory;
 	}
-	
-	/**
-	 * Acquires a lock on the DirectoryProvider backing this Workspace;
-	 * this is required to use optimizerPhase()
-	 * @see #optimizerPhase()
-	 */
-	public void lock() {
-		lock.lock();
-	}
 
-	/**
-	 * Releases the lock obtained by calling lock(). The caller must own the lock.
-	 * @see #lock()
-	 */
-	public void unlock() {
-		lock.unlock();
-	}
-
-	private void assertOwnLock() {
-		if ( ! lock.isHeldByCurrentThread() )
-			throw new AssertionFailure( "Not owning DirectoryProvider Lock" );
-	}
-
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -89,7 +89,7 @@
 		}
 		else {
 			try {
-				Class processorFactoryClass = ReflectHelper.classForName( backend, BatchedQueueingProcessor.class );
+				Class<?> processorFactoryClass = ReflectHelper.classForName( backend, BatchedQueueingProcessor.class );
 				backendQueueProcessorFactory = ( BackendQueueProcessorFactory ) processorFactoryClass.newInstance();
 			}
 			catch ( ClassNotFoundException e ) {

Added: search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/BatchBackend.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/BatchBackend.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/BatchBackend.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,56 @@
+package org.hibernate.search.backend.impl.batchlucene;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+
+/**
+ * Implementors of this interface are not drop-in replacements for the standard BackendQueueProcessorFactory
+ * but are meant to be used only during batch processing.
+ * The order of LuceneWork(s) processed is not guaranteed as the queue is consumed by several
+ * concurrent workers. 
+ *  
+ * @author Sanne Grinovero
+ */
+public interface BatchBackend {
+	
+	/**
+	 * Used at startup, called once as first method.
+	 * @param props all configuration properties
+	 * @param searchFactory the client
+	 */
+	void initialize(Properties props, IndexerProgressMonitor monitor, SearchFactoryImplementor searchFactory);
+
+	/**
+	 * Enqueues one work to be processed asynchronously
+	 * @param work
+	 * @throws InterruptedException if the current thread is interrupted while
+	 * waiting for the work queue to have enough space.
+	 */
+	void enqueueAsyncWork(LuceneWork work) throws InterruptedException;
+	
+	/**
+	 * Does one work in sync
+	 * @param work
+	 * @throws InterruptedException
+	 */
+	void doWorkInSync(LuceneWork work);
+	
+	/**
+	 * Waits until all work is done and terminates the executors.
+	 * IndexWriter is not closed yet: work in sync can still be processed.
+	 * @throws InterruptedException if the current thread is interrupted
+     * while waiting for the enqueued tasks to be finished.
+	 */
+	void stopAndFlush(long timeout, TimeUnit unit) throws InterruptedException;
+
+	/**
+	 * Used to shutdown and release resources.
+	 * No other method should be used after this one.
+	 */
+	void close();
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/DirectoryProviderWorkspace.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/DirectoryProviderWorkspace.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/DirectoryProviderWorkspace.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,124 @@
+package org.hibernate.search.backend.impl.batchlucene;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkDelegate;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.batchindexing.Executors;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Collects all resources needed to apply changes to one index.
+ * They are reused across the processing of all LuceneWork.
+ * 
+ * !! Be careful to ensure the IndexWriter is eventually closed,
+ * or the index will stay locked.
+ * @see close();
+ *
+ * @author Sanne Grinovero
+ */
+class DirectoryProviderWorkspace {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final ExecutorService executor;
+	private final LuceneWorkVisitor visitor;
+	private final Workspace workspace;
+	private final IndexerProgressMonitor monitor;
+	
+	private final AtomicBoolean closed = new AtomicBoolean( false );
+	
+	DirectoryProviderWorkspace(SearchFactoryImplementor searchFactoryImp, DirectoryProvider<?> dp, IndexerProgressMonitor monitor, int maxThreads) {
+		if ( maxThreads < 1 ) {
+			throw new IllegalArgumentException( "maxThreads needs to be at least 1" );
+		}
+		this.monitor = monitor;
+		workspace = new Workspace( searchFactoryImp, dp );
+		visitor = new LuceneWorkVisitor( workspace );
+		executor = Executors.newFixedThreadPool( maxThreads, "indexwriter" );
+	}
+
+	/**
+	 * Notify the indexwriting threads that they should quit at the end of the enqueued
+	 * tasks. Waits for the end of the current queue, then commits changes.
+	 * @throws InterruptedException
+	 */
+	public void stopAndFlush(long timeout, TimeUnit unit) throws InterruptedException {
+		checkIsNotClosed();
+		executor.shutdown(); //it becomes illegal to add more work
+		executor.awaitTermination( timeout, unit );
+		workspace.commitIndexWriter(); //commits changes if any
+		//does not yet close the IndexWriter !
+	}
+
+	/**
+	 * Used to do some tasks at the beginning and/or at the end of the main batch
+	 * operations. This work is not done async.
+	 * @param work
+	 */
+	public void doWorkInSync(LuceneWork work) {
+		checkIsNotClosed();
+		LuceneWorkDelegate delegate = work.getWorkDelegate( visitor );
+		delegate.performWork( work, workspace.getIndexWriter( true ) );
+		delegate.logWorkDone( work , monitor );
+		//if the IndexWriter was opened, it's not closed now.
+	}
+
+	public void enqueueAsyncWork(LuceneWork work) {
+		//no need to check if we are closed here, better check inside the async method
+		executor.execute( new AsyncIndexRunnable( work ) );
+	}
+
+	/**
+	 * Makes sure the executor is closed and closes the IndexWriter.
+	 */
+	public void close() {
+		if ( closed.compareAndSet( false, true ) ) {
+			try {
+				if ( ! executor.isShutdown() ) {
+					log.error( "Terminating batch work! Index might end up in inconsistent state." );
+					executor.shutdownNow();
+				}
+			}
+			finally {
+				workspace.closeIndexWriter();
+			}	
+		}
+		else {
+			checkIsNotClosed(); //will throw an appropriate exception
+		}
+	}
+	
+	/**
+	 * Verifies this is not closed yet, or throws an exception.
+	 */
+	private void checkIsNotClosed() {
+		if ( closed.get() ) {
+			throw new SearchException( "Batch DirectoryProviderWorkspace is closed already" );
+		}
+	}
+
+	private class AsyncIndexRunnable implements Runnable {
+		
+		private final LuceneWork work;
+
+		AsyncIndexRunnable(LuceneWork work) {
+			this.work = work;
+		}
+
+		public void run() {
+			doWorkInSync( work );
+		}
+		
+	}
+	
+}


Property changes on: search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/DirectoryProviderWorkspace.java
___________________________________________________________________
Name: svn:executable
   + *

Added: search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/LuceneBatchBackend.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/LuceneBatchBackend.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/batchlucene/LuceneBatchBackend.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,120 @@
+package org.hibernate.search.backend.impl.batchlucene;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.search.Environment;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.configuration.ConfigurationParseHelper;
+import org.hibernate.search.backend.impl.lucene.DpSelectionVisitor;
+import org.hibernate.search.backend.impl.lucene.PerDirectoryWorkProcessor;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.store.IndexShardingStrategy;
+
+/**
+ * First EXPERIMENTAL BatchBackend; this is not meant to be used as a regular
+ * backend, only to apply batch changes to the index. Several threads
+ * are used to make changes to each index, so order of Work processing is not guaranteed.
+ * 
+ * @author Sanne Grinovero
+ */
+public class LuceneBatchBackend implements BatchBackend {
+	
+	public static final String CONCURRENT_WRITERS = Environment.BATCH_BACKEND + ".concurrent_writers";
+
+	private static final DpSelectionVisitor providerSelectionVisitor = new DpSelectionVisitor();
+
+	private SearchFactoryImplementor searchFactoryImplementor;
+	private final Map<DirectoryProvider<?>,DirectoryProviderWorkspace> resourcesMap = new HashMap<DirectoryProvider<?>,DirectoryProviderWorkspace>();
+	private final PerDirectoryWorkProcessor asyncWorker = new AsyncBatchPerDirectoryWorkProcessor();
+	private final PerDirectoryWorkProcessor syncWorker = new SyncBatchPerDirectoryWorkProcessor();
+
+	public void initialize(Properties cfg, IndexerProgressMonitor monitor, SearchFactoryImplementor searchFactoryImplementor) {
+		this.searchFactoryImplementor = searchFactoryImplementor;
+		int maxThreadsPerIndex = ConfigurationParseHelper.getIntValue( cfg, "concurrent_writers", 2 );
+		if ( maxThreadsPerIndex < 1 ) {
+			throw new SearchException( "concurrent_writers for batch backend must be at least 1." );
+		}
+		for ( DirectoryProvider<?> dp : searchFactoryImplementor.getDirectoryProviders() ) {
+			DirectoryProviderWorkspace resources = new DirectoryProviderWorkspace( searchFactoryImplementor, dp, monitor, maxThreadsPerIndex );
+			resourcesMap.put( dp, resources );
+		}
+	}
+
+	public void enqueueAsyncWork(LuceneWork work) throws InterruptedException {
+		sendWorkToShards( work, asyncWorker );
+	}
+
+	public void doWorkInSync(LuceneWork work) {
+		try {
+			sendWorkToShards( work, syncWorker );
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			//doesn't happen, see SyncBatchPerDirectoryWorkProcessor below: is missing the throws.
+			throw new SearchException( "AssertionFailure" );
+		}
+	}
+
+	/**
+	 * Stops the background threads and flushes changes;
+	 * Please note the timeout is applied to each index in
+	 * sequence, so it might take as much time as timeout*directoryproviders
+	 */
+	public void stopAndFlush(long timeout, TimeUnit unit) throws InterruptedException {
+		for ( DirectoryProviderWorkspace res : resourcesMap.values() ) {
+			res.stopAndFlush( timeout, unit );
+		}
+	}
+	
+	public void close() {
+		Throwable error = null;
+		for ( DirectoryProviderWorkspace res : resourcesMap.values() ) {
+			try {
+				res.close();
+			}
+			catch (Throwable t) {
+				//make sure to try closing all IndexWriters
+				error = t;
+			}
+		}
+		if ( error != null ) {
+			throw new SearchException( "Error while closing batch indexer", error );
+		}
+	}
+	
+	private void sendWorkToShards(LuceneWork work, PerDirectoryWorkProcessor worker) throws InterruptedException {
+		final Class<?> entityType = work.getEntityClass();
+		DocumentBuilderIndexedEntity<?> documentBuilder = searchFactoryImplementor.getDocumentBuilderIndexedEntity( entityType );
+		IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
+		work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work, shardingStrategy, worker );
+	}
+
+	/**
+	 * Implements a PerDirectoryWorkProcessor to enqueue work Asynchronously.
+	 */
+	private class AsyncBatchPerDirectoryWorkProcessor implements PerDirectoryWorkProcessor {
+
+		public void addWorkToDpProcessor(DirectoryProvider<?> dp, LuceneWork work) throws InterruptedException {
+			resourcesMap.get( dp ).enqueueAsyncWork( work );
+		}
+		
+	}
+	
+	/**
+	 * Implements a PerDirectoryWorkProcessor to enqueue work Synchronously.
+	 */
+	private class SyncBatchPerDirectoryWorkProcessor implements PerDirectoryWorkProcessor {
+
+		public void addWorkToDpProcessor(DirectoryProvider<?> dp, LuceneWork work) {
+			resourcesMap.get( dp ).doWorkInSync( work );
+		}
+		
+	}
+
+}

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -6,15 +6,16 @@
 /**
  * @author Sanne Grinovero
  */
-interface DpSelectionDelegate {
+public interface DpSelectionDelegate {
 	
 	/**
 	 * The LuceneWork must be applied to different indexes.
 	 * @param work the work to split.
 	 * @param queues the target queue to add work to.
 	 * @param shardingStrategy the Sharding strategy is usually needed to identify affected Directories. 
+	 * @throws InterruptedException 
 	 */
 	void addAsPayLoadsToQueue(LuceneWork work,
-			IndexShardingStrategy shardingStrategy, QueueProcessors queues);
+			IndexShardingStrategy shardingStrategy, PerDirectoryWorkProcessor queues) throws InterruptedException;
 
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/DpSelectionVisitor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -17,7 +17,7 @@
  * 
  * @author Sanne Grinovero
  */
-class DpSelectionVisitor implements WorkVisitor<DpSelectionDelegate> {
+public class DpSelectionVisitor implements WorkVisitor<DpSelectionDelegate> {
 	
 	private final AddSelectionDelegate addDelegate = new AddSelectionDelegate();
 	private final DeleteSelectionDelegate deleteDelegate = new DeleteSelectionDelegate();
@@ -43,8 +43,8 @@
 	private static class AddSelectionDelegate implements DpSelectionDelegate {
 
 		public void addAsPayLoadsToQueue(LuceneWork work,
-				IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
-			DirectoryProvider provider = shardingStrategy.getDirectoryProviderForAddition(
+				IndexShardingStrategy shardingStrategy, PerDirectoryWorkProcessor queues) throws InterruptedException {
+			DirectoryProvider<?> provider = shardingStrategy.getDirectoryProviderForAddition(
 					work.getEntityClass(),
 					work.getId(),
 					work.getIdInString(),
@@ -58,13 +58,13 @@
 	private static class DeleteSelectionDelegate implements DpSelectionDelegate {
 
 		public void addAsPayLoadsToQueue(LuceneWork work,
-				IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
-			DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
+				IndexShardingStrategy shardingStrategy, PerDirectoryWorkProcessor queues) throws InterruptedException {
+			DirectoryProvider<?>[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
 					work.getEntityClass(),
 					work.getId(),
 					work.getIdInString()
 			);
-			for (DirectoryProvider provider : providers) {
+			for (DirectoryProvider<?> provider : providers) {
 				queues.addWorkToDpProcessor( provider, work );
 			}
 		}
@@ -74,9 +74,9 @@
 	private static class OptimizeSelectionDelegate implements DpSelectionDelegate {
 
 		public void addAsPayLoadsToQueue(LuceneWork work,
-				IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
-			DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForAllShards();
-			for (DirectoryProvider provider : providers) {
+				IndexShardingStrategy shardingStrategy, PerDirectoryWorkProcessor queues) throws InterruptedException {
+			DirectoryProvider<?>[] providers = shardingStrategy.getDirectoryProvidersForAllShards();
+			for (DirectoryProvider<?> provider : providers) {
 				queues.addWorkToDpProcessor( provider, work );
 			}
 		}
@@ -86,13 +86,13 @@
 	private static class PurgeAllSelectionDelegate implements DpSelectionDelegate {
 
 		public void addAsPayLoadsToQueue(LuceneWork work,
-				IndexShardingStrategy shardingStrategy, QueueProcessors queues) {
-			DirectoryProvider[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
+				IndexShardingStrategy shardingStrategy, PerDirectoryWorkProcessor queues) throws InterruptedException {
+			DirectoryProvider<?>[] providers = shardingStrategy.getDirectoryProvidersForDeletion(
 					work.getEntityClass(),
 					work.getId(),
 					work.getIdInString()
 			);
-			for (DirectoryProvider provider : providers) {
+			for (DirectoryProvider<?> provider : providers) {
 				queues.addWorkToDpProcessor( provider, work );
 			}
 		}

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -43,13 +43,13 @@
 	public void run() {
 		QueueProcessors processors = new QueueProcessors( resourcesMap );
 		// divide the queue in tasks, adding to QueueProcessors by affected Directory.
-		for ( LuceneWork work : queue ) {
-			final Class<?> entityType = work.getEntityClass();
-			DocumentBuilderIndexedEntity<?> documentBuilder = searchFactoryImplementor.getDocumentBuilderIndexedEntity( entityType );
-			IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
-			work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work, shardingStrategy, processors );
-		}
 		try {
+			for ( LuceneWork work : queue ) {
+				final Class<?> entityType = work.getEntityClass();
+				DocumentBuilderIndexedEntity<?> documentBuilder = searchFactoryImplementor.getDocumentBuilderIndexedEntity( entityType );
+				IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
+				work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work, shardingStrategy, processors );
+			}
 			//this Runnable splits tasks in more runnables and then runs them:
 			processors.runAll( sync );
 		} catch (InterruptedException e) {

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -76,21 +76,12 @@
 	}
 	
 	private void performOptimizations() {
-		log.trace( "Locking Workspace (or waiting to...)" );
-		workspace.lock();
-		try {
-			log.trace( "Workspace lock aquired." );
-			//TODO next line is assuming the OptimizerStrategy will need an IndexWriter;
-			// would be nicer to have the strategy put an OptimizeWork on the queue,
-			// or just return "yes please" (true) to some method?
-			//FIXME will not have a chance to trigger when no "add" activity is done.
-			// this is correct until we enable modification counts for deletions too.
-			workspace.optimizerPhase();
-		}
-		finally {
-			workspace.unlock();
-			log.trace( "Unlocked Workspace" );
-		}
+		//TODO next line is assuming the OptimizerStrategy will need an IndexWriter;
+		// would be nicer to have the strategy put an OptimizeWork on the queue,
+		// or just return "yes please" (true) to some method?
+		//FIXME will not have a chance to trigger when no "add" activity is done.
+		// this is correct until we enable modification counts for deletions too.
+		workspace.optimizerPhase();
 	}
 
 	/**

Added: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDirectoryWorkProcessor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDirectoryWorkProcessor.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/PerDirectoryWorkProcessor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,18 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.store.DirectoryProvider;
+
+/**
+ * Interface to implement visitor pattern in combination
+ * with DpSelectionVisitor and DpSelectionDelegate to
+ * send LuceneWork to the appropriate queues, as defined
+ * by an IndexShardingStrategy.
+ * 
+ * @author Sanne Grinovero
+ */
+public interface PerDirectoryWorkProcessor {
+	
+	public void addWorkToDpProcessor(DirectoryProvider<?> dp, LuceneWork work) throws InterruptedException;
+
+}

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -18,7 +18,7 @@
  * them concurrently.
  * @author Sanne Grinovero
  */
-class QueueProcessors {
+class QueueProcessors implements PerDirectoryWorkProcessor {
 	
 	private final Map<DirectoryProvider, PerDPResources> resourcesMap;
 	private final Map<DirectoryProvider, PerDPQueueProcessor> dpProcessors = new HashMap<DirectoryProvider, PerDPQueueProcessor>();
@@ -27,7 +27,7 @@
 		this.resourcesMap = resourcesMap;
 	}
 
-	void addWorkToDpProcessor(DirectoryProvider dp, LuceneWork work) {
+	public void addWorkToDpProcessor(DirectoryProvider dp, LuceneWork work) {
 		if ( ! dpProcessors.containsKey( dp ) ) {
 			dpProcessors.put( dp, new PerDPQueueProcessor( resourcesMap.get( dp ) ) );
 		}

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/AddWorkDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -12,6 +12,7 @@
 import org.hibernate.search.backend.AddLuceneWork;
 import org.hibernate.search.backend.LuceneWork;
 import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
 import org.hibernate.search.util.LoggerFactory;
 import org.hibernate.search.util.ScopedAnalyzer;
@@ -95,4 +96,8 @@
 		return analyzerClone;
 	}
 
+	public void logWorkDone(LuceneWork work, IndexerProgressMonitor monitor) {
+		monitor.documentsAdded( 1 );
+	}
+
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/DeleteWorkDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -12,6 +12,7 @@
 import org.hibernate.search.SearchException;
 import org.hibernate.search.backend.LuceneWork;
 import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
 import org.hibernate.search.engine.DocumentBuilder;
 import org.hibernate.search.util.LoggerFactory;
@@ -59,4 +60,8 @@
 		}
 	}
 
+	public void logWorkDone(LuceneWork work, IndexerProgressMonitor monitor) {
+		// TODO Auto-generated method stub
+	}
+
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -2,6 +2,7 @@
 
 import org.apache.lucene.index.IndexWriter;
 import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 
 /**
  * @author Sanne Grinovero
@@ -16,4 +17,12 @@
 	 */
 	void performWork(LuceneWork work, IndexWriter writer);
 	
+	/**
+	 * Used for stats and performance counters, use the monitor
+	 * to keep track of activity done on the index.
+	 * @param work the work which was done.
+	 * @param monitor the monitor tracking activity.
+	 */
+	void logWorkDone(LuceneWork work, IndexerProgressMonitor monitor);
+	
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/OptimizeWorkDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -8,6 +8,7 @@
 import org.hibernate.search.SearchException;
 import org.hibernate.search.backend.LuceneWork;
 import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.util.LoggerFactory;
 
 /**
@@ -42,4 +43,9 @@
 		}
 	}
 
+	public void logWorkDone(LuceneWork work, IndexerProgressMonitor monitor) {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

Modified: search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/backend/impl/lucene/works/PurgeAllWorkDelegate.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -6,6 +6,7 @@
 
 import org.hibernate.search.SearchException;
 import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.engine.DocumentBuilder;
 import org.hibernate.search.util.LoggerFactory;
 
@@ -37,4 +38,9 @@
 		}
 	}
 
+	public void logWorkDone(LuceneWork work, IndexerProgressMonitor monitor) {
+		// TODO Auto-generated method stub
+		
+	}
+
 }

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchCoordinator.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchCoordinator.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchCoordinator.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,125 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.CacheMode;
+import org.hibernate.SessionFactory;
+import org.hibernate.search.backend.OptimizeLuceneWork;
+import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.backend.impl.batchlucene.BatchBackend;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Makes sure that several different BatchIndexingWorkspace(s)
+ * can be started concurrently, sharing the same batch-backend
+ * and IndexWriters.
+ * 
+ * @author Sanne Grinovero
+ */
+public class BatchCoordinator implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final Class<?>[] rootEntities;
+	private final SearchFactoryImplementor searchFactoryImplementor;
+	private final SessionFactory sessionFactory;
+	private final int objectLoadingThreads;
+	private final int collectionLoadingThreads;
+	private final CacheMode cacheMode;
+	private final int objectLoadingBatchSize;
+	private final boolean optimizeAtEnd;
+	private final boolean purgeAtStart;
+	private final boolean optimizeAfterPurge;
+	private final CountDownLatch endAllSignal;
+	private final IndexerProgressMonitor monitor;
+	private final int objectsLimit;
+	
+	private BatchBackend backend;
+
+	public BatchCoordinator(Set<Class<?>> rootEntities,
+			SearchFactoryImplementor searchFactoryImplementor,
+			SessionFactory sessionFactory, int objectLoadingThreads,
+			int collectionLoadingThreads, CacheMode cacheMode,
+			int objectLoadingBatchSize, int objectsLimit,
+			boolean optimizeAtEnd,
+			boolean purgeAtStart, boolean optimizeAfterPurge,
+			IndexerProgressMonitor monitor) {
+				this.rootEntities = rootEntities.toArray( new Class<?>[ rootEntities.size() ] );
+				this.searchFactoryImplementor = searchFactoryImplementor;
+				this.sessionFactory = sessionFactory;
+				this.objectLoadingThreads = objectLoadingThreads;
+				this.collectionLoadingThreads = collectionLoadingThreads;
+				this.cacheMode = cacheMode;
+				this.objectLoadingBatchSize = objectLoadingBatchSize;
+				this.optimizeAtEnd = optimizeAtEnd;
+				this.purgeAtStart = purgeAtStart;
+				this.optimizeAfterPurge = optimizeAfterPurge;
+				this.monitor = monitor;
+				this.objectsLimit = objectsLimit;
+				this.endAllSignal = new CountDownLatch( rootEntities.size() );
+	}
+
+	public void run() {
+		backend = searchFactoryImplementor.makeBatchBackend( monitor );
+		try {
+			beforeBatch(); // purgeAll and pre-optimize activities
+			doBatchWork();
+			backend.stopAndFlush( 60L*60*24, TimeUnit.SECONDS ); //1 day : enough to flush to indexes?
+//			backend.stopAndFlush( 10, TimeUnit.SECONDS );
+			afterBatch();
+		} catch (InterruptedException e) {
+			log.error( "Batch indexing was interrupted" );
+			Thread.currentThread().interrupt();
+		}
+		finally {
+			backend.close();
+		}
+	}
+
+	private void doBatchWork() throws InterruptedException {
+		ExecutorService executor = Executors.newFixedThreadPool( rootEntities.length, "BatchIndexingWorkspace" );
+		for ( Class<?> type : rootEntities ) {
+			executor.execute( new BatchIndexingWorkspace(
+					searchFactoryImplementor, sessionFactory, type,
+					objectLoadingThreads, collectionLoadingThreads,
+					cacheMode, objectLoadingBatchSize,
+					endAllSignal, monitor, backend, objectsLimit ) );
+		}
+		executor.shutdown();
+		endAllSignal.await(); //waits for the executor to finish
+	}
+
+	private void afterBatch() {
+		if ( this.optimizeAtEnd ) {
+			Set<Class<?>> targetedClasses = searchFactoryImplementor.getIndexedTypesPolymorphic( rootEntities );
+			optimize( targetedClasses );
+		}
+	}
+
+	private void beforeBatch() {
+		if ( this.purgeAtStart ) {
+			//purgeAll for affected entities
+			Set<Class<?>> targetedClasses = searchFactoryImplementor.getIndexedTypesPolymorphic( rootEntities );
+			for ( Class<?> clazz : targetedClasses ) {
+				//needs do be in-sync work to make sure we wait for the end of it.
+				backend.doWorkInSync( new PurgeAllLuceneWork( clazz ) ); 
+			}
+			if ( this.optimizeAfterPurge ) {
+				optimize( targetedClasses );
+			}
+		}
+	}
+
+	private void optimize(Set<Class<?>> targetedClasses) {
+		for ( Class<?> clazz : targetedClasses ) {
+			//TODO the backend should remove duplicate optimize work to the same DP (as entities might share indexes)
+			backend.doWorkInSync( new OptimizeLuceneWork( clazz ) );
+		}
+	}
+	
+}

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,19 +1,14 @@
 package org.hibernate.search.batchindexing;
 
-import java.util.ArrayList;
+import java.io.Serializable;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.hibernate.CacheMode;
 import org.hibernate.SessionFactory;
 import org.hibernate.search.SearchException;
-import org.hibernate.search.backend.BackendQueueProcessorFactory;
-import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.OptimizeLuceneWork;
-import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.backend.impl.batchlucene.BatchBackend;
 import org.hibernate.search.engine.SearchFactoryImplementor;
 import org.hibernate.search.util.LoggerFactory;
 import org.slf4j.Logger;
@@ -33,21 +28,17 @@
 	
 	//following order shows the 4 stages of an entity flowing to the index:
 	private final ThreadPoolExecutor 		execIdentifiersLoader;
-	private final ProducerConsumerQueue 	fromIdentifierListToEntities;
+	private final ProducerConsumerQueue<List<Serializable>> 	fromIdentifierListToEntities;
 	private final ThreadPoolExecutor 		execFirstLoader;
-	private final ProducerConsumerQueue 	fromEntityToAddwork;
+	private final ProducerConsumerQueue<Object> 	fromEntityToAddwork;
 	private final ThreadPoolExecutor		execDocBuilding;
-	private final ProducerConsumerQueue		fromAddworkToIndex;
-	private final ThreadPoolExecutor 		execWriteIndex;
 	
 	private final int objectLoadingThreadNum;
 	private final int luceneworkerBuildingThreadNum;
-	private final int indexWritingThreadNum;
 	private final Class<?> indexedType;
 	
 	// status control
-	private final AtomicBoolean started = new AtomicBoolean( false );
-	private final CountDownLatch endWritersSignal; //released when we stop adding Documents to Index 
+	private final CountDownLatch producerEndSignal; //released when we stop adding Documents to Index 
 	private final CountDownLatch endAllSignal; //released when we release all locks and IndexWriter
 	
 	// progress monitor
@@ -57,16 +48,17 @@
 	private final CacheMode cacheMode;
 	private final int objectLoadingBatchSize;
 
-	private final boolean purgeAtStart;
-	private final boolean optimizeAfterPurge;
-	private final boolean optimizeAtEnd;
+	private final BatchBackend backend;
+	
+	private final int objectsLimit;
 
 	public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor, SessionFactory sessionFactory,
 			Class<?> entityType,
-			int objectLoadingThreads, int collectionLoadingThreads, int writerThreads,
+			int objectLoadingThreads, int collectionLoadingThreads,
 			CacheMode cacheMode, int objectLoadingBatchSize,
-			boolean optimizeAtEnd, boolean purgeAtStart, boolean optimizeAfterPurge, CountDownLatch endAllSignal,
-			IndexerProgressMonitor monitor) {
+			CountDownLatch endAllSignal,
+			IndexerProgressMonitor monitor, BatchBackend backend,
+			int objectsLimit) {
 		
 		this.indexedType = entityType;
 		this.searchFactory = searchFactoryImplementor;
@@ -75,116 +67,69 @@
 		//thread pool sizing:
 		this.objectLoadingThreadNum = objectLoadingThreads;
 		this.luceneworkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded as needed by building the document
-//		this.indexWritingThreadNum = writerThreads; //FIXME enable this line and remove the next line after solving HSEARCH-367
-		this.indexWritingThreadNum = 1;
 		
 		//loading options:
 		this.cacheMode = cacheMode;
 		this.objectLoadingBatchSize = objectLoadingBatchSize;
+		this.backend = backend;
 		
 		//executors: (quite expensive constructor)
 		//execIdentifiersLoader has size 1 and is not configurable: ensures the list is consistent as produced by one transaction
 		this.execIdentifiersLoader = Executors.newFixedThreadPool( 1, "identifierloader" );
 		this.execFirstLoader = Executors.newFixedThreadPool( objectLoadingThreadNum, "entityloader" );
 		this.execDocBuilding = Executors.newFixedThreadPool( luceneworkerBuildingThreadNum, "collectionsloader" );
-		this.execWriteIndex = Executors.newFixedThreadPool( indexWritingThreadNum, "analyzers" );
 		
 		//pipelining queues:
-		this.fromIdentifierListToEntities = new ProducerConsumerQueue( 1 );
-		this.fromEntityToAddwork = new ProducerConsumerQueue( objectLoadingThreadNum );
-		this.fromAddworkToIndex = new ProducerConsumerQueue( luceneworkerBuildingThreadNum );
+		this.fromIdentifierListToEntities = new ProducerConsumerQueue<List<Serializable>>( 1 );
+		this.fromEntityToAddwork = new ProducerConsumerQueue<Object>( objectLoadingThreadNum );
 		
 		//end signal shared with other instances:
 		this.endAllSignal = endAllSignal;
-		this.endWritersSignal = new CountDownLatch( indexWritingThreadNum );
+		this.producerEndSignal = new CountDownLatch( luceneworkerBuildingThreadNum );
 		
-		//behaviour options:
-		this.optimizeAtEnd = optimizeAtEnd;
-		this.optimizeAfterPurge = optimizeAfterPurge;
-		this.purgeAtStart = purgeAtStart;
-		
 		this.monitor = monitor;
+		this.objectsLimit = objectsLimit;
 	}
 
 	public void run() {
-		if ( ! started.compareAndSet( false, true ) )
-			throw new IllegalStateException( "BatchIndexingWorkspace can be started only once." );
-		boolean interrupted = false;
 		try {
-			//TODO switch to batch mode in backend
-			if ( purgeAtStart ) {
-				purgeAll();
-				if ( optimizeAfterPurge ) {
-					optimize();
-				}
-			}
 			
-			BackendQueueProcessorFactory backendQueueProcessorFactory = searchFactory.getBackendQueueProcessorFactory();
 			//first start the consumers, then the producers (reverse order):
-			for ( int i=0; i < indexWritingThreadNum; i++ ) {
-				//from LuceneWork to IndexWriters:
-				execWriteIndex.execute( new IndexWritingJob(
-						fromAddworkToIndex, endWritersSignal, monitor, backendQueueProcessorFactory ) );
-			}
 			for ( int i=0; i < luceneworkerBuildingThreadNum; i++ ) {
-				//from entity to LuceneWork:
+			//from entity to LuceneWork:
 				execDocBuilding.execute( new EntityConsumerLuceneworkProducer(
-						fromEntityToAddwork, fromAddworkToIndex, monitor,
-						sessionFactory, searchFactory, cacheMode) );
+						fromEntityToAddwork, monitor,
+						sessionFactory, producerEndSignal, searchFactory,
+						cacheMode, backend) );
 			}
 			for ( int i=0; i < objectLoadingThreadNum; i++ ) {
-				//from primary key to loaded entity:
+			//from primary key to loaded entity:
 				execFirstLoader.execute( new IdentifierConsumerEntityProducer(
 						fromIdentifierListToEntities, fromEntityToAddwork, monitor,
 						sessionFactory, cacheMode, indexedType) );
 			}
+			//from class definition to all primary keys:
+			execIdentifiersLoader.execute( new IdentifierProducer(
+					fromIdentifierListToEntities, sessionFactory,
+					objectLoadingBatchSize, indexedType, monitor,
+					objectsLimit ) );
 			
-			execIdentifiersLoader.execute( new IdentifierProducer(fromIdentifierListToEntities, sessionFactory, objectLoadingBatchSize, indexedType, monitor) );
 			//shutdown all executors:
 			execIdentifiersLoader.shutdown();
 			execFirstLoader.shutdown();
 			execDocBuilding.shutdown();
-			execWriteIndex.shutdown();
-			log.debug( "helper executors are shutting down" );
 			try {
-				endWritersSignal.await(); //await all indexing is done.
+				producerEndSignal.await(); //await for all work being sent to the backend
+				log.debug( "All work for type {} has been produced", indexedType.getName() );
 			} catch (InterruptedException e) {
-				interrupted = true;
-				throw new SearchException( "Interrupted on batch Indexing; index will be left in unknown state!", e);
+				//restore interruption signal:
+				Thread.currentThread().interrupt();
+				throw new SearchException( "Interrupted on batch Indexing; index will be left in unknown state!", e );
 			}
-			log.debug( "index writing finished" );
-			if ( optimizeAtEnd ) {
-				log.debug( "starting optimization" );
-				optimize();
-			}
 		}
 		finally {
 			endAllSignal.countDown();
-			if ( interrupted ) {
-				//restore interruption signal:
-				Thread.currentThread().interrupt();
-			}
 		}
 	}
 
-	private void optimize() {
-		Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic( new Class[] {indexedType} );
-		List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size() );
-		for ( Class<?> clazz : targetedClasses ) {
-			queue.add( new OptimizeLuceneWork( clazz ) );
-		}
-		//TODO use the batch backend
-		searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
-	}
-
-	private void purgeAll() {
-		Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic( new Class[] {indexedType} );
-		List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size() );
-		for ( Class<?> clazz : targetedClasses ) {
-			queue.add( new PurgeAllLuceneWork( clazz ) );
-		}
-		//TODO use the batch backend
-		searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
-	}
-
 }

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -2,6 +2,7 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
 import org.hibernate.CacheMode;
 import org.hibernate.FlushMode;
@@ -11,6 +12,7 @@
 import org.hibernate.SessionFactory;
 import org.hibernate.Transaction;
 import org.hibernate.search.backend.AddLuceneWork;
+import org.hibernate.search.backend.impl.batchlucene.BatchBackend;
 import org.hibernate.search.bridge.TwoWayFieldBridge;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
 import org.hibernate.search.engine.SearchFactoryImplementor;
@@ -28,26 +30,30 @@
 	
 	private static final Logger log = LoggerFactory.make();
 	
-	private final ProducerConsumerQueue source;
-	private final ProducerConsumerQueue destination;
+	private final ProducerConsumerQueue<Object> source;
 	private final SessionFactory sessionFactory;
 	private final Map<Class<?>, DocumentBuilderIndexedEntity<?>> documentBuilders;
 	private final IndexerProgressMonitor monitor;
 	
 	private static final int CLEAR_PERIOD = 50;
 	private final CacheMode cacheMode;
+
+	private final CountDownLatch producerEndSignal;
+
+	private final BatchBackend backend;
 	
 	public EntityConsumerLuceneworkProducer(
-			ProducerConsumerQueue entitySource,
-			ProducerConsumerQueue fromAddworkToIndex,
+			ProducerConsumerQueue<Object> entitySource,
 			IndexerProgressMonitor monitor,
 			SessionFactory sessionFactory,
-			SearchFactoryImplementor searchFactory, CacheMode cacheMode) {
+			CountDownLatch producerEndSignal,
+			SearchFactoryImplementor searchFactory, CacheMode cacheMode, BatchBackend backend) {
 		this.source = entitySource;
-		this.destination = fromAddworkToIndex;
 		this.monitor = monitor;
 		this.sessionFactory = sessionFactory;
+		this.producerEndSignal = producerEndSignal;
 		this.cacheMode = cacheMode;
+		this.backend = backend;
 		this.documentBuilders = searchFactory.getDocumentBuildersIndexedEntities();
 	}
 
@@ -61,6 +67,7 @@
 			transaction.commit();
 		}
 		finally {
+			producerEndSignal.countDown();
 			session.close();
 		}
 		log.debug( "finished" );
@@ -89,10 +96,8 @@
 		}
 		catch (InterruptedException e) {
 			// just quit
+			Thread.currentThread().interrupt();
 		}
-		finally {
-			destination.producerStopping();
-		}
 	}
 
 	@SuppressWarnings("unchecked")
@@ -105,7 +110,7 @@
 		//depending on the complexity of the object graph going to be indexed it's possible
 		//that we hit the database several times during work construction.
 		AddLuceneWork addWork = docBuilder.createAddWork( clazz, entity, id, idInString, true );
-		destination.put( addWork );
+		backend.enqueueAsyncWork( addWork );
 	}
 	
 }

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -8,27 +8,34 @@
 
 /**
  * Helper class to create threads;
- * these threads are grouped and named to be readily identified in a profiler.
+ * these threads are grouped and named to be identified in a profiler.
  * 
  * @author Sanne Grinovero
  */
 public class Executors {
 	
-	private static final String THREAD_GROUP_PREFIX = "Hibernate Search indexer: ";
+	private static final String THREAD_GROUP_PREFIX = "Hibernate Search: ";
+	private static final int QUEUE_MAX_LENGTH = 1000; //TODO have it configurable?
 	
 	/**
-	 * Creates a new fixed size ThreadPoolExecutor
-	 * @param threads
-	 * @param groupname
-	 * @return
+	 * Creates a new fixed size ThreadPoolExecutor.
+	 * It's using a blockingqueue of maximum 1000 elements and the rejection
+	 * policy is set to CallerRunsPolicy for the case the queue is full.
+	 * These settings are required to cap the queue, to make sure the
+	 * timeouts are reasonable for most jobs.
+	 * 
+	 * @param threads the number of threads
+	 * @param groupname a label to identify the threadpool; useful for profiling.
+	 * @return the new ExecutorService
 	 */
 	public static ThreadPoolExecutor newFixedThreadPool(int threads, String groupname) {
 		return new ThreadPoolExecutor(
 				threads,
 				threads,
 	            0L, TimeUnit.MILLISECONDS,
-	            new LinkedBlockingQueue<Runnable>(),
-	            new SearchThreadFactory( groupname ) );
+	            new LinkedBlockingQueue<Runnable>( QUEUE_MAX_LENGTH ),
+	            new SearchThreadFactory( groupname ),
+	            new ThreadPoolExecutor.CallerRunsPolicy() );
 	}
 	
 	/**
@@ -42,15 +49,15 @@
 
         SearchThreadFactory(String groupname) {
             SecurityManager s = System.getSecurityManager();
-            group = (s != null)? s.getThreadGroup() :
+            group = ( s != null ) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
             namePrefix = THREAD_GROUP_PREFIX + groupname + "-";
         }
 
         public Thread newThread(Runnable r) {
-            Thread t = new Thread(group, r, 
+            Thread t = new Thread( group, r, 
                                   namePrefix + threadNumber.getAndIncrement(),
-                                  0);
+                                  0 );
             return t;
         }
         

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -26,16 +26,16 @@
 	
 	private static final Logger log = LoggerFactory.make();
 
-	private final ProducerConsumerQueue source;
-	private final ProducerConsumerQueue destination;
+	private final ProducerConsumerQueue<List<Serializable>> source;
+	private final ProducerConsumerQueue<Object> destination;
 	private final SessionFactory sessionFactory;
 	private final CacheMode cacheMode;
 	private final Class<?> type;
 	private final IndexerProgressMonitor monitor;
 
 	public IdentifierConsumerEntityProducer(
-			ProducerConsumerQueue fromIdentifierListToEntities,
-			ProducerConsumerQueue fromEntityToAddwork,
+			ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities,
+			ProducerConsumerQueue<Object> fromEntityToAddwork,
 			IndexerProgressMonitor monitor,
 			SessionFactory sessionFactory,
 			CacheMode cacheMode, Class<?> type) {
@@ -70,6 +70,7 @@
 			do {
 				take = source.take();
 				if ( take != null ) {
+					@SuppressWarnings("unchecked")
 					List<Serializable> listIds = (List<Serializable>) take;
 					log.trace( "received list of ids {}", listIds );
 					loadList( listIds, session );
@@ -79,6 +80,7 @@
 		}
 		catch (InterruptedException e) {
 			// just quit
+			Thread.currentThread().interrupt();
 		}
 		finally {
 			destination.producerStopping();

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -30,29 +30,34 @@
 	
 	private static final Logger log = LoggerFactory.make();
 
-	private final ProducerConsumerQueue destination;
+	private final ProducerConsumerQueue<List<Serializable>> destination;
 	private final SessionFactory sessionFactory;
 	private final int batchSize;
 	private final Class<?> indexedType;
 	private final IndexerProgressMonitor monitor;
 
+	private final int objectsLimit;
+
 	/**
 	 * @param fromIdentifierListToEntities the target queue where the produced identifiers are sent to
 	 * @param sessionFactory the Hibernate SessionFactory to use to load entities
 	 * @param objectLoadingBatchSize affects mostly the next consumer: IdentifierConsumerEntityProducer
 	 * @param indexedType the entity type to be loaded
 	 * @param monitor to monitor indexing progress
+	 * @param objectsLimit if not zero
 	 */
 	public IdentifierProducer(
-			ProducerConsumerQueue fromIdentifierListToEntities,
+			ProducerConsumerQueue<List<Serializable>> fromIdentifierListToEntities,
 			SessionFactory sessionFactory,
 			int objectLoadingBatchSize,
-			Class<?> indexedType, IndexerProgressMonitor monitor) {
+			Class<?> indexedType, IndexerProgressMonitor monitor,
+			int objectsLimit) {
 				this.destination = fromIdentifierListToEntities;
 				this.sessionFactory = sessionFactory;
 				this.batchSize = objectLoadingBatchSize;
 				this.indexedType = indexedType;
 				this.monitor = monitor;
+				this.objectsLimit = objectsLimit;
 				log.trace( "created" );
 	}
 	
@@ -82,15 +87,18 @@
 	}
 
 	private void loadAllIdentifiers(final StatelessSession session) throws InterruptedException {
-		Integer count = (Integer) session
+		Integer totalCount = (Integer) session
 			.createCriteria( indexedType )
 			.setProjection( Projections.count( "id" ) )
 			.setCacheable( false )
 			.uniqueResult();
-
-		log.debug( "going to fetch {} primary keys", count);
-		monitor.addToTotalCount( count );
 		
+		if ( objectsLimit != 0 && objectsLimit < totalCount.intValue() ) {
+			totalCount = objectsLimit;
+		}
+		log.debug( "going to fetch {} primary keys", totalCount);
+		monitor.addToTotalCount( totalCount );
+		
 		Criteria criteria = session
 			.createCriteria( indexedType )
 			.setProjection( Projections.id() )
@@ -99,6 +107,7 @@
 		
 		ScrollableResults results = criteria.scroll( ScrollMode.FORWARD_ONLY );
 		ArrayList<Serializable> destinationList = new ArrayList<Serializable>( batchSize );
+		int counter = 0;
 		try {
 			while ( results.next() ) {
 				Serializable id = (Serializable) results.get( 0 );
@@ -107,6 +116,10 @@
 					enqueueList( destinationList );
 					destinationList = new ArrayList<Serializable>( batchSize ); 
 				}
+				counter++;
+				if ( counter == totalCount ) {
+					break;
+				}
 			}
 		}
 		finally {

Deleted: search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,76 +0,0 @@
-package org.hibernate.search.batchindexing;
-
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-
-import org.hibernate.search.backend.BackendQueueProcessorFactory;
-import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.util.LoggerFactory;
-import org.slf4j.Logger;
-
-/**
- * This Runnable is meant to push to the backend
- * all LuceneWork it can take from the ProducerConsumerQueue.
- * 
- * FIXME:
- * Is currently more an adaptor to bridge the queue approach
- * to the current backend implementations. This is not by any means
- * the most efficient way to index documents, it has been built only
- * to test the approach.
- * 
- */
-class IndexWritingJob implements Runnable {
-	
-	private static final Logger log = LoggerFactory.make();
-	
-	private final ProducerConsumerQueue pleq;
-	private final BackendQueueProcessorFactory backend;
-	private final CountDownLatch endSignal;
-	private final IndexerProgressMonitor monitor;
-	
-	/**
-	 * @param pleq This queue contains the LuceneWork to be send to the backend
-	 * @param endSignal
-	 * @param monitor
-	 * @param backendQueueProcessorFactory
-	 */
-	IndexWritingJob(ProducerConsumerQueue pleq, CountDownLatch endSignal, IndexerProgressMonitor monitor, BackendQueueProcessorFactory backendQueueProcessorFactory) {
-		this.pleq = pleq;
-		this.monitor = monitor;
-		this.backend = backendQueueProcessorFactory;
-		this.endSignal = endSignal;
-		log.trace( "created" );
-	}
-
-	public void run() {
-		log.debug( "Start" );
-		try {
-			while ( true ) {
-				Object take = pleq.take();
-				if ( take == null ) {
-					break;
-				}
-				else {
-					LuceneWork work = (LuceneWork) take;
-					log.trace( "received lucenework {}", work );
-					//TODO group work in bigger lists of size #batch and introduce the CommitLuceneWork to avoid waiting more work
-					ArrayList<LuceneWork> list = new ArrayList<LuceneWork>( 1 );
-					list.add( work );
-					Runnable processor = backend.getProcessor( list );
-					processor.run();
-					monitor.documentsAdded( 1L );
-				}
-			}
-			log.debug( "Finished" );
-		}
-		catch (InterruptedException e) {
-			// normal quit: no need to propagate interruption.
-			log.debug( "Interrupted" );
-		}
-		finally {
-			//notify everybody we have finished.
-			endSignal.countDown();
-		}
-	}
-
-}
\ No newline at end of file

Modified: search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -11,11 +11,13 @@
  * 
  * @author Sanne Grinovero
  */
-public class ProducerConsumerQueue {
+public class ProducerConsumerQueue<T> {
 	
 	private static final int DEFAULT_BUFF_LENGHT = 1000;
 	private static final Object exitToken = new Object();
 	
+	//doesn't use generics here as exitToken needs to be put in the queue too:
+	@SuppressWarnings("unchecked")
 	private final BlockingQueue queue;
 	private final AtomicInteger producersToWaitFor;
 	
@@ -26,6 +28,7 @@
 		this( DEFAULT_BUFF_LENGHT, producersToWaitFor );
 	}
 	
+	@SuppressWarnings("unchecked")
 	public ProducerConsumerQueue( int queueLenght, int producersToWaitFor ) {
 		queue = new ArrayBlockingQueue( queueLenght );
 		this.producersToWaitFor = new AtomicInteger( producersToWaitFor );
@@ -37,7 +40,8 @@
 	 * @return the next object in the queue, or null to exit
 	 * @throws InterruptedException
 	 */
-	public Object take() throws InterruptedException {
+	@SuppressWarnings("unchecked")
+	public T take() throws InterruptedException {
 		Object obj = queue.take();
 		if ( obj == exitToken ) {
 			//restore exit signal for other threads
@@ -45,7 +49,7 @@
 			return null;
 		}
 		else {
-			return obj;
+			return (T)obj;
 		}
 	}
 	
@@ -55,7 +59,8 @@
 	 * @param obj
 	 * @throws InterruptedException
 	 */
-	public void put(Object obj) throws InterruptedException {
+	@SuppressWarnings("unchecked")
+	public void put(T obj) throws InterruptedException {
 		queue.put( obj );
 	}
 
@@ -64,9 +69,10 @@
 	 * finished. After doing it can safely terminate.
 	 * After all producer threads have called producerStopping()
 	 * a token will be inserted in the blocking queue to eventually
-	 * awake sleaping consumers and have them quit, after the
+	 * awake sleeping consumers and have them quit, after the
 	 * queue has been processed.
 	 */
+	@SuppressWarnings("unchecked")
 	public void producerStopping() {
 		int activeProducers = producersToWaitFor.decrementAndGet();
 		//last producer must close consumers

Modified: search/trunk/src/main/java/org/hibernate/search/engine/SearchFactoryImplementor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/engine/SearchFactoryImplementor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/engine/SearchFactoryImplementor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -9,6 +9,8 @@
 import org.hibernate.search.backend.BackendQueueProcessorFactory;
 import org.hibernate.search.backend.LuceneIndexingParameters;
 import org.hibernate.search.backend.Worker;
+import org.hibernate.search.backend.impl.batchlucene.BatchBackend;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.filter.FilterCachingStrategy;
 import org.hibernate.search.store.DirectoryProvider;
 import org.hibernate.search.store.optimization.OptimizerStrategy;
@@ -61,4 +63,6 @@
 	int getFilterCacheBitResultsSize();
 
 	Set<Class<?>> getIndexedTypesPolymorphic(Class<?>[] classes);
+	
+	BatchBackend makeBatchBackend(IndexerProgressMonitor progressMonitor);
 }

Modified: search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,16 +1,15 @@
 package org.hibernate.search.impl;
 
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.hibernate.CacheMode;
 import org.hibernate.SessionFactory;
 import org.hibernate.search.Indexer;
-import org.hibernate.search.batchindexing.BatchIndexingWorkspace;
+import org.hibernate.search.batchindexing.BatchCoordinator;
+import org.hibernate.search.batchindexing.Executors;
 import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.engine.SearchFactoryImplementor;
 import org.hibernate.search.util.LoggerFactory;
@@ -30,15 +29,13 @@
 	private final SessionFactory sessionFactory;
 
 	protected Set<Class<?>> rootEntities = new HashSet<Class<?>>();
-	protected List<BatchIndexingWorkspace> indexers = new LinkedList<BatchIndexingWorkspace>();
-	boolean started = false; 
-	private CountDownLatch endAllSignal;
 	
 	// default settings defined here:
 	private int objectLoadingThreads = 2; //loading the main entity
 	private int collectionLoadingThreads = 4; //also responsible for loading of lazy @IndexedEmbedded collections
 	private int writerThreads = 1; //also running the Analyzers
 	private int objectLoadingBatchSize = 10;
+	private int objectsLimit = 0; //means no limit at all
 	private CacheMode cacheMode = CacheMode.IGNORE;
 	private boolean optimizeAtEnd = true;
 	private boolean purgeAtStart = true;
@@ -87,9 +84,7 @@
 			}
 		}
 		cleaned.removeAll( toRemove );
-		if ( log.isDebugEnabled() ) {
-			log.debug( "Targets for indexing job: {}", cleaned );
-		}
+		log.debug( "Targets for indexing job: {}", cleaned );
 		return cleaned;
 	}
 
@@ -143,30 +138,37 @@
 		return this;
 	}
 	
-	public void start() {
-		if ( started ) {
-			throw new IllegalStateException( "Can be started only once" );
+	public Future<?> start() {
+		BatchCoordinator coordinator = createCoordinator();
+		ExecutorService executor = Executors.newFixedThreadPool( 1, "batch coordinator" );
+		try {
+			Future<?> submit = executor.submit( coordinator );
+			return submit;
 		}
-		else {
-			started = true;
-			endAllSignal = new CountDownLatch( rootEntities.size() );
-			for ( Class<?> type : rootEntities ) {
-				indexers.add( new BatchIndexingWorkspace(
-						searchFactoryImplementor, sessionFactory, type,
-						objectLoadingThreads, collectionLoadingThreads, writerThreads,
-						cacheMode, objectLoadingBatchSize,
-						optimizeAtEnd, purgeAtStart, optimizeAfterPurge,
-						endAllSignal, monitor) );
-			}
-			for ( BatchIndexingWorkspace batcher : indexers ) {
-				new Thread( batcher ).start();
-			}
+		finally {
+			executor.shutdown();
 		}
 	}
+	
+	public void startAndWait() throws InterruptedException {
+		BatchCoordinator coordinator = createCoordinator();
+		coordinator.run();
+		if ( Thread.currentThread().isInterrupted() ) {
+			throw new InterruptedException();
+		}
+	}
+	
+	protected BatchCoordinator createCoordinator() {
+		return new BatchCoordinator( rootEntities, searchFactoryImplementor, sessionFactory,
+				objectLoadingThreads, collectionLoadingThreads,
+				cacheMode, objectLoadingBatchSize, objectsLimit,
+				optimizeAtEnd, purgeAtStart, optimizeAfterPurge,
+				monitor );
+	}
 
-	public boolean startAndWait(long timeout, TimeUnit unit) throws InterruptedException {
-		start();
-		return endAllSignal.await( timeout, unit );
+	public Indexer limitObjects(int maximum) {
+		this.objectsLimit = maximum;
+		return this;
 	}
 
 }

Modified: search/trunk/src/main/java/org/hibernate/search/impl/SearchFactoryImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/SearchFactoryImpl.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/impl/SearchFactoryImpl.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -43,6 +43,10 @@
 import org.hibernate.search.backend.Worker;
 import org.hibernate.search.backend.WorkerFactory;
 import org.hibernate.search.backend.configuration.ConfigurationParseHelper;
+import org.hibernate.search.backend.configuration.MaskedProperty;
+import org.hibernate.search.backend.impl.batchlucene.BatchBackend;
+import org.hibernate.search.backend.impl.batchlucene.LuceneBatchBackend;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
 import org.hibernate.search.cfg.SearchConfiguration;
 import org.hibernate.search.cfg.SearchMapping;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
@@ -84,6 +88,7 @@
 	private Map<String, Analyzer> analyzers;
 	private final AtomicBoolean stopped = new AtomicBoolean( false );
 	private final int cacheBitResultsSize;
+	private final Properties configurationProperties;
 
 	private final PolymorphicIndexHierarchy indexHierarchy = new PolymorphicIndexHierarchy();
 
@@ -137,6 +142,7 @@
 		this.cacheBitResultsSize = ConfigurationParseHelper.getIntValue(
 				cfg.getProperties(), Environment.CACHE_DOCIDRESULTS_SIZE, CachingWrapperFilter.DEFAULT_SIZE
 		);
+		this.configurationProperties = cfg.getProperties();
 		this.barrier = 1; //write barrier
 	}
 
@@ -542,6 +548,34 @@
 		} //read barrier
 		return indexHierarchy.getIndexedClasses( classes );
 	}
+	
+	public BatchBackend makeBatchBackend(IndexerProgressMonitor progressMonitor) {
+		BatchBackend batchBackend;
+		String impl = configurationProperties.getProperty( Environment.BATCH_BACKEND );
+		if ( StringHelper.isEmpty( impl ) || "LuceneBatch".equalsIgnoreCase( impl ) ) {
+			batchBackend = new LuceneBatchBackend();
+		}
+		else {
+			try {
+				Class batchBackendClass = ReflectHelper
+						.classForName( impl, SearchFactoryImpl.class );
+				batchBackend = ( BatchBackend ) batchBackendClass.newInstance();
+			}
+			catch ( ClassNotFoundException e ) {
+				throw new SearchException( "Unable to find batchbackend implementation class: " + impl, e );
+			}
+			catch ( IllegalAccessException e ) {
+				throw new SearchException( "Unable to instantiate batchbackend class: " + impl, e );
+			}
+			catch ( InstantiationException e ) {
+				throw new SearchException( "Unable to instantiate batchbackend class: " + impl, e );
+			}
+		}
+		Properties batchBackendConfiguration = new MaskedProperty(
+				this.configurationProperties, Environment.BATCH_BACKEND );
+		batchBackend.initialize( batchBackendConfiguration, progressMonitor, this );
+		return batchBackend;
+	}
 
 	/**
 	 * Helper class which keeps track of all super classes and interfaces of the indexed entities.

Modified: search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -42,14 +42,14 @@
 	}
 	
 	protected int getStatusMessagePeriod() {
-		return 1000;
+		return 50;
 	}
 	
 	protected void printStatusMessage(long starttimems, long totalTodoCount, long doneCount) {
 		long elapsedMs = System.currentTimeMillis() - starttimems;
 		log.info( "{} documents indexed in {} ms", doneCount, elapsedMs );
-		double estimateSpeed = ( (double) doneCount * 1000f ) / elapsedMs ;
-		double estimatePercentileComplete = ( (double) doneCount*100 ) / (double)totalTodoCount ;
+		float estimateSpeed = (float) doneCount * 1000f / elapsedMs ;
+		float estimatePercentileComplete = (float) doneCount * 100f / (float) totalTodoCount ;
 		log.info( "Indexing speed: {} documents/second; progress: {}%", estimateSpeed, estimatePercentileComplete );
 	}
 

Modified: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -10,4 +10,12 @@
 	
 	public String catalogueGroupName = "";
 
+	public String getCatalogueGroupName() {
+		return catalogueGroupName;
+	}
+
+	public void setCatalogueGroupName(String catalogueGroupName) {
+		this.catalogueGroupName = catalogueGroupName;
+	}
+	
 }

Modified: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,15 +1,36 @@
 package org.hibernate.search.test.batchindexing;
 
 import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
 import javax.persistence.Id;
 
+import org.hibernate.search.annotations.Field;
 import org.hibernate.search.annotations.Indexed;
 
 @Indexed
 @Entity
-public class Book {
+public class Book implements TitleAble {
 	
-	@Id
-	public long id;
+	private long id;
+	
+	private String title;
 
+	@Id @GeneratedValue
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	@Field
+	public String getTitle() {
+		return title;
+	}
+
+	public void setTitle(String title) {
+		this.title = title;
+	}
+
 }

Modified: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -9,7 +9,7 @@
 
 @Indexed
 @Entity
-public class Dvd {
+public class Dvd implements TitleAble {
 	
 	public long unusuallyNamedIdentifier;
 	public String title;

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/IndexingGeneratedCorpusTest.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/IndexingGeneratedCorpusTest.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/IndexingGeneratedCorpusTest.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,147 @@
+package org.hibernate.search.test.batchindexing;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.hibernate.Transaction;
+import org.hibernate.search.Environment;
+import org.hibernate.search.FullTextQuery;
+import org.hibernate.search.FullTextSession;
+import org.hibernate.search.backend.impl.batchlucene.LuceneBatchBackend;
+import org.hibernate.search.test.util.FullTextSessionBuilder;
+import org.hibernate.search.test.util.textbuilder.SentenceInventor;
+
+/**
+ * Tests the fullTextSession.createIndexer() API
+ * for basic functionality.
+ * 
+ * @author Sanne Grinovero
+ */
+public class IndexingGeneratedCorpusTest extends TestCase {
+	
+	private final int BOOK_NUM = 3000;
+	private final int ANCIENTBOOK_NUM = 200;
+	private final int DVD_NUM = 2000;
+	
+	private SentenceInventor sentenceInventor = new SentenceInventor( 7L, 10000 );
+	private FullTextSessionBuilder builder;
+	private int totalEntitiesInDB = 0;
+
+	@Override
+	public void setUp() throws Exception {
+		super.setUp();
+		builder = new FullTextSessionBuilder();
+		builder
+			.addAnnotatedClass( Book.class )
+			.addAnnotatedClass( Dvd.class )
+			.addAnnotatedClass( AncientBook.class )
+			.setProperty( "hibernate.show_sql", "false" ) //too verbose for this test
+			.setProperty( LuceneBatchBackend.CONCURRENT_WRITERS, "4" )
+			.build();
+		createMany( Book.class, BOOK_NUM );
+		createMany( Dvd.class, DVD_NUM );
+		createMany( AncientBook.class, ANCIENTBOOK_NUM );
+	}
+
+	private void createMany(Class<? extends TitleAble> entityType, int amount ) throws InstantiationException, IllegalAccessException {
+		FullTextSession fullTextSession = builder.openFullTextSession();
+		try {
+			Transaction tx = fullTextSession.beginTransaction();
+			for ( int i = 0; i < amount; i++ ) {
+				TitleAble instance = entityType.newInstance();
+				instance.setTitle( sentenceInventor.nextSentence() );
+				fullTextSession.persist( instance );
+				totalEntitiesInDB++;
+				if ( i % 250 == 249 ) {
+					tx.commit();
+					fullTextSession.clear();
+					System.out.println( "Test preparation: " + totalEntitiesInDB + " entities persisted" );
+					tx =  fullTextSession.beginTransaction();
+				}
+			}
+			tx.commit();
+		}
+		finally {
+			fullTextSession.close();
+		}
+	}
+	
+	public void testBatchIndexing() throws InterruptedException {
+		verifyResultNumbers(); //initial count of entities should match expectations
+		purgeAll(); // empty indexes
+		verifyIsEmpty();
+		reindexAll(); // rebuild the indexes
+		verifyResultNumbers(); // verify the count match again
+		reindexAll(); //tests that purgeAll is automatic:
+		verifyResultNumbers(); //..same numbers again
+	}
+	
+	private void reindexAll() throws InterruptedException {
+		FullTextSession fullTextSession = builder.openFullTextSession();
+		try {
+			fullTextSession.createIndexer( Object.class )
+				.documentBuilderThreads( 8 )
+				.objectLoadingThreads( 4 )
+				.objectLoadingBatchSize( 30 )
+				.startAndWait();
+		}	
+		finally {
+			fullTextSession.close();
+		}
+	}
+
+	private void purgeAll() {
+		FullTextSession fullTextSession = builder.openFullTextSession();
+		try {
+			Transaction tx = fullTextSession.beginTransaction();
+			fullTextSession.purgeAll( Object.class );
+			tx.commit();
+		}
+		finally {
+			fullTextSession.close();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private void verifyResultNumbers() {
+		assertEquals( DVD_NUM,
+				countByFT( Dvd.class ) );
+		assertEquals( ANCIENTBOOK_NUM + BOOK_NUM,
+				countByFT( Book.class ) );
+		assertEquals( ANCIENTBOOK_NUM,
+				countByFT( AncientBook.class ) );
+		assertEquals( DVD_NUM + ANCIENTBOOK_NUM + BOOK_NUM,
+				countByFT( AncientBook.class, Book.class, Dvd.class ) );
+		assertEquals( DVD_NUM + ANCIENTBOOK_NUM,
+				countByFT( AncientBook.class, Dvd.class ) );
+	}
+	
+	@SuppressWarnings("unchecked")
+	private void verifyIsEmpty() {
+		assertEquals( 0, countByFT( Dvd.class ) );
+		assertEquals( 0, countByFT( Book.class ) );
+		assertEquals( 0, countByFT( AncientBook.class ) );
+		assertEquals( 0, countByFT( AncientBook.class, Book.class, Dvd.class ) );
+	}
+
+	private int countByFT(Class<? extends TitleAble>... types) {
+		Query findAll = new MatchAllDocsQuery();
+		int bySize = 0;
+		int byResultSize = 0;
+		FullTextSession fullTextSession = builder.openFullTextSession();
+		try {
+			Transaction tx = fullTextSession.beginTransaction();
+			FullTextQuery fullTextQuery = fullTextSession.createFullTextQuery( findAll, types );
+			bySize = fullTextQuery.list().size();
+			byResultSize = fullTextQuery.getResultSize();
+			tx.commit();
+		}
+		finally {
+			fullTextSession.close();
+		}
+		assertEquals( bySize, byResultSize );
+		return bySize;
+	}
+
+}


Property changes on: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/IndexingGeneratedCorpusTest.java
___________________________________________________________________
Name: svn:keywords
   + Id

Modified: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java	2009-06-29 20:18:57 UTC (rev 16966)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -1,7 +1,6 @@
 package org.hibernate.search.test.batchindexing;
 
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
@@ -18,6 +17,10 @@
 
 public class SearchIndexerTest extends TestCase {
 	
+	/**
+	 * test that the Indexer is properly identifying the root entities
+	 * from the selection of classes to be indexed.
+	 */
 	public void testEntityHierarchy() {
 		FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
 			.addAnnotatedClass( ModernBook.class )
@@ -56,7 +59,7 @@
 		}
 	}
 	
-	private class TestableSearchIndexerImpl extends IndexerImpl {
+	private static class TestableSearchIndexerImpl extends IndexerImpl {
 		
 		protected TestableSearchIndexerImpl(SearchFactoryImplementor searchFactory, Class<?>... types) {
 			super( searchFactory, null, types );
@@ -68,6 +71,10 @@
 		
 	}
 	
+	/**
+	 * Test to verify that the identifier loading works even when
+	 * the property is not called "id" 
+	 */
 	public void testIdentifierNaming() throws InterruptedException {
 		//disable automatic indexing, to test manual index creation.
 		FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
@@ -95,7 +102,7 @@
 		{
 			FullTextSession fullTextSession = ftsb.openFullTextSession();
 			fullTextSession.createIndexer( Dvd.class )
-				.startAndWait( 10, TimeUnit.SECONDS );
+				.startAndWait();
 			fullTextSession.close();
 		}
 		{	
@@ -104,8 +111,9 @@
 		}
 	}
 	
-	public int countResults( Term termForQuery, FullTextSessionBuilder ftSessionBuilder, Class<?> type ) {
-		TermQuery fullTextQuery = new TermQuery( new Term( "title", "trek" ) );
+	//helper method
+	private int countResults( Term termForQuery, FullTextSessionBuilder ftSessionBuilder, Class<?> type ) {
+		TermQuery fullTextQuery = new TermQuery( termForQuery );
 		FullTextSession fullTextSession = ftSessionBuilder.openFullTextSession();
 		Transaction transaction = fullTextSession.beginTransaction();
 		FullTextQuery query = fullTextSession.createFullTextQuery( fullTextQuery, type );

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/TitleAble.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/TitleAble.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/TitleAble.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,9 @@
+package org.hibernate.search.test.batchindexing;
+
+public interface TitleAble {
+
+	public String getTitle();
+
+	public void setTitle(String title);
+
+}


Property changes on: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/TitleAble.java
___________________________________________________________________
Name: svn:keywords
   + Id

Added: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/SentenceInventor.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/SentenceInventor.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/SentenceInventor.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,154 @@
+package org.hibernate.search.test.util.textbuilder;
+
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Test utility meant to produce sentences of a randomly generated language,
+ * having some properties of natural languages.
+ * The goal is to produce sentences which look like a western text,
+ * but are not.
+ * All sentences from the same SentenceInventor will share
+ * a limited dictionary, making the frequencies suitable to test
+ * with Lucene.
+ * Sentences produced depend from the constructor arguments,
+ * making the output predictable for testing purposes.
+ * 
+ * @author Sanne Grinovero
+ */
+public class SentenceInventor {
+	
+	private final Random r;
+	private final WordDictionary dictionary;
+	//array contains repeated object for probability distribution (more chance for a ",")
+	private final char[] sentenceSeparators = new char[] { ',', ',', ',' , ';', ':', ':' };
+	
+	/**
+	 * @param randomSeed the seed to use for random generator
+	 * @param dictionarySize the number of terms to insert in the dictionary used to build sentences
+	 */
+	public SentenceInventor(long randomSeed, int dictionarySize) {
+		r = new Random( randomSeed );
+		dictionary = randomDictionary( dictionarySize );
+	}
+	
+	/**
+	 * @return a random character from the ASCII table (text chars only)
+	 */
+	public char randomCharacter() {
+		return (char) (r.nextInt( 26 ) + 65);
+	}
+	
+	/**
+	 * @param length the desired length
+	 * @return a randomly generated String
+	 */
+	public String randomString(int length) {
+		char[] chars = new char[length];
+		for ( int i=0; i<length; i++ ) {
+			chars[i] = randomCharacter();
+		}
+		return new String( chars );
+	}
+	
+	/**
+	 * Produces a randomly generated String, using
+	 * only western alphabet characters and selecting
+	 * the length as a normal distribution of natural languages.
+	 * @return the generated String
+	 */
+	public String randomString() {
+		double d = r.nextGaussian() * 6.3d;
+		int l = (int) d + 6;
+		if ( l > 0 )
+			return randomString( l );
+		else
+			return randomString();
+	}
+	
+	/**
+	 * Produces a random String, which might be lowercase,
+	 * completely uppercase, or uppercasing the first char
+	 * (randomly selected)
+	 * @return produced String
+	 */
+	public String randomTerm() {
+		int i = r.nextInt( 200 );
+		String term = randomString();
+		if ( i > 10 )
+			//completely lowercase 189/200 cases
+			return term.toLowerCase();
+		else if ( i < 2 )
+			//completely uppercase in 2/200 cases
+			return term;
+		else
+			//first letter upercase in 9/200 cases
+			return term.substring( 0, 1 ) + term.substring( 1 ).toLowerCase();
+	}
+	
+	private WordDictionary randomDictionary(int size) {
+		Set<String> tree = new TreeSet<String>();
+		while ( tree.size() != size ) {
+			tree.add( randomTerm() );
+		}
+		return new WordDictionary( tree );
+	}
+	
+	/**
+	 * Builds a sentence concatenating terms from the generated dictionary and spaces
+	 * @return a sentence
+	 */
+	public String nextSentence() {
+		int sentenceLength = r.nextInt( 3 ) + r.nextInt( 10 ) + 1;
+		String[] sentence = new String[sentenceLength];
+		for ( int i=0; i<sentenceLength; i++ ) {
+			sentence[i] = dictionary.randomWord();
+		}
+		if ( sentenceLength == 1 ) {
+			return sentence[0];
+		}
+		else {
+			StringBuilder sb = new StringBuilder( sentence[0]);
+			for ( int i=1; i<sentenceLength; i++) {
+				sb.append( " " );
+				sb.append( sentence[i] );
+			}
+			return sb.toString();
+		}
+	}
+	
+	/**
+	 * Combines a random (gaussian) number of sentences in a period,
+	 * using some punctuation symbols and
+	 * capitalizing first char, terminating with dot and newline.
+	 * @return
+	 */
+	public String nextPeriod() {
+		int periodLengthSentences = r.nextInt( 7 ) - 2;
+		periodLengthSentences = ( periodLengthSentences < 1 ) ? 1 : periodLengthSentences;
+		String firstsentence = nextSentence();
+		StringBuilder sb = new StringBuilder()
+			.append( firstsentence.substring( 0,1 ).toUpperCase() )
+			.append( firstsentence.substring( 1 ) );
+		for ( int i=1; i<periodLengthSentences; i++ ) {
+			int separatorCharIndex = r.nextInt( sentenceSeparators.length );
+			sb
+				.append( sentenceSeparators[separatorCharIndex] )
+				.append( ' ' )
+				.append( nextSentence() );
+		}
+		sb.append( ".\n" );
+		return sb.toString();
+	}
+	
+	//run it to get an idea of what this class is going to produce
+	public static void main(String[] args) {
+		SentenceInventor wi = new SentenceInventor( 7L, 10000 );
+		for (int i=0; i<30; i++) {
+			System.out.print( wi.nextPeriod() );
+		}
+	}
+
+}
+


Property changes on: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/SentenceInventor.java
___________________________________________________________________
Name: svn:keywords
   + Id

Added: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/TextProductionTest.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/TextProductionTest.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/TextProductionTest.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,23 @@
+package org.hibernate.search.test.util.textbuilder;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests WordDictionary and WordInventor,
+ * these are test utilities not part of the Search distribution;
+ * the test exists to spot if the text they produce is unchanged, so
+ * that other tests can rely on working test utilities.
+ * 
+ * @see WordDictionary
+ * @see SentenceInventor
+ * 
+ * @author Sanne Grinovero
+ */
+public class TextProductionTest extends TestCase {
+	
+	public void testSomeWordsGetBuilt() {
+		SentenceInventor wi = new SentenceInventor( 7L, 10000 );
+		assertEquals( "Qoswo, orrmi ag ybwp bbtb kw qgtqaon lyhk nbv: qrqm flyui hyshm jmpqyb qmolml fjxw gnumocv Twwg.\n", wi.nextPeriod() );
+	}
+
+}


Property changes on: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/TextProductionTest.java
___________________________________________________________________
Name: svn:keywords
   + Id

Added: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/WordDictionary.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/WordDictionary.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/WordDictionary.java	2009-06-29 22:25:43 UTC (rev 16967)
@@ -0,0 +1,50 @@
+package org.hibernate.search.test.util.textbuilder;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Test utility meant to build a consistent dictionary of words.
+ * This is not just a random generator: like in natural
+ * languages shorter terms have a higher frequency in a text corpus
+ * and the dictionary size is limited.
+ * 
+ * @author Sanne Grinovero
+ */
+public class WordDictionary {
+	
+	private final String[] positionalWords;
+	private final int maxSize;
+	private final double gaussFactor;
+	
+	private static final Random r = new Random( 12L );
+	
+	public WordDictionary(Set<String> words) {
+		this.positionalWords = words.toArray( new String[0] );
+		//sort by String length. Languages use shorter terms more often.
+		Arrays.sort( positionalWords, new StringLengthComparator() );
+		maxSize = positionalWords.length;
+		gaussFactor = ((double)maxSize +1 ) / 4d ;
+	}
+	
+	private static class StringLengthComparator implements Comparator<String> {
+
+		public int compare(String o1, String o2) {
+			return o1.length()-o2.length();
+		}
+		
+	}
+	
+	public String randomWord() {
+		int position = Math.abs((int) ( r.nextGaussian() * gaussFactor ) );
+		if ( position < maxSize ) {
+			return positionalWords[position];
+		}
+		else {
+			return randomWord();
+		}
+	}
+
+}


Property changes on: search/trunk/src/test/java/org/hibernate/search/test/util/textbuilder/WordDictionary.java
___________________________________________________________________
Name: svn:keywords
   + Id




More information about the hibernate-commits mailing list