[hibernate-commits] Hibernate SVN: r16580 - in search/trunk/src: main/java/org/hibernate/search/batchindexing and 6 other directories.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Sun May 17 21:59:11 EDT 2009


Author: sannegrinovero
Date: 2009-05-17 21:59:10 -0400 (Sun, 17 May 2009)
New Revision: 16580

Added:
   search/trunk/src/main/java/org/hibernate/search/Indexer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/
   search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java
   search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
   search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
   search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java
   search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
Modified:
   search/trunk/src/main/java/org/hibernate/search/FullTextSession.java
   search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java
   search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java
   search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java
   search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java
Log:
HSEARCH-218 loading of entities in parallel sessions (multithreaded) to improve index rebuilding performance. Useless until integrated with a new ad-hoc backend.

Modified: search/trunk/src/main/java/org/hibernate/search/FullTextSession.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/FullTextSession.java	2009-05-18 01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/FullTextSession.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -67,4 +67,14 @@
 	 * Flush all index changes forcing Hibernate Search to apply all changes to the index not waiting for the batch limit.
 	 */
 	public void flushToIndexes();
+	
+	/**
+	 * Creates an Indexer to rebuild the indexes of some
+	 * or all indexed entity types.
+	 * Instances cannot be reused.
+	 * @param types optionally restrict the operation to selected types
+	 * @return
+	 */
+	public Indexer createIndexer(Class<?>... types);
+	
 }

Added: search/trunk/src/main/java/org/hibernate/search/Indexer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/Indexer.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/Indexer.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,88 @@
+package org.hibernate.search;
+
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.CacheMode;
+
+public interface Indexer {
+	
+	/**
+	 * Set the number of threads to be used to load
+	 * the root entities.
+	 * @param numberOfThreads
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer objectLoadingThreads(int numberOfThreads);
+	
+	/**
+	 * Sets the batch size used to load the root entities.
+	 * @param batchSize
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer objectLoadingBatchSize(int batchSize);
+	
+	/**
+	 * Sets the number of threads used to load the lazy collections
+	 * related to the indexed entities.
+	 * @param numberOfThreads
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer documentBuilderThreads(int numberOfThreads);
+	
+	//not supported yet
+	//Indexer indexWriterThreads(int numberOfThreads);
+	
+	/**
+	 * Sets the cache interaction mode for the data loading tasks.
+	 * Defaults to <tt>CacheMode.IGNORE</tt>.
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer cacheMode(CacheMode cacheMode);
+	
+	/**
+	 * If index optimization has to be started at the end
+	 * of the indexing process.
+	 * Defaults to <tt>true</tt>.
+	 * @param optimize
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer optimizeAtEnd(boolean optimize);
+	
+	/**
+	 * If index optimization should be run before starting,
+	 * after the purgeAll. Has no effect if <tt>purgeAll</tt> is set to false.
+	 * Defaults to <tt>true</tt>.
+	 * @param optimize
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer optimizeAfterPurge(boolean optimize);
+	
+	/**
+	 * If all entities should be removed from the index before starting
+	 * using purgeAll. Set it only to false if you know there are no
+	 * entities in the index: otherwise search results may be duplicated.
+	 * Defaults to true.
+	 * @param purgeAll
+	 * @return <tt>this</tt> for method chaining
+	 */
+	Indexer purgeAllAtStart(boolean purgeAll);
+	
+	/**
+	 * Starts the indexing process in background (asynchronous).
+	 * Can be called only once.
+	 */
+	void start();
+	
+	/**
+	 * Starts the indexing process, and then block until it's finished.
+	 * Can be called only once.
+	 * @param timeout the maximum time to wait
+	 * @param unit the time unit of the <tt>timeout</tt> argument.
+	 * @return <tt>true</tt> if the process finished and <tt>false</tt>
+     * if the waiting time elapsed before the process was finished.
+	 * @throws InterruptedException if the current thread is interrupted
+     * while waiting.
+	 */
+	boolean startAndWait( long timeout, TimeUnit unit ) throws InterruptedException;
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,190 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.hibernate.CacheMode;
+import org.hibernate.SessionFactory;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.OptimizeLuceneWork;
+import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This runnable will prepare a pipeline for batch indexing
+ * of entities, managing the lifecycle of several ThreadPools.
+ * 
+ * @author Sanne Grinovero
+ */
+public class BatchIndexingWorkspace implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final SearchFactoryImplementor searchFactory;
+	private final SessionFactory sessionFactory;
+	
+	//following order shows the 4 stages of an entity flowing to the index:
+	private final ThreadPoolExecutor 		execIdentifiersLoader;
+	private final ProducerConsumerQueue 	fromIdentifierListToEntities;
+	private final ThreadPoolExecutor 		execFirstLoader;
+	private final ProducerConsumerQueue 	fromEntityToAddwork;
+	private final ThreadPoolExecutor		execDocBuilding;
+	private final ProducerConsumerQueue		fromAddworkToIndex;
+	private final ThreadPoolExecutor 		execWriteIndex;
+	
+	private final int objectLoadingThreadNum;
+	private final int luceneworkerBuildingThreadNum;
+	private final int indexWritingThreadNum;
+	private final Class<?> indexedType;
+	
+	// status control
+	private final AtomicBoolean started = new AtomicBoolean( false );
+	private final CountDownLatch endWritersSignal; //released when we stop adding Documents to Index 
+	private final CountDownLatch endAllSignal; //released when we release all locks and IndexWriter
+	
+	// progress monitor
+	private final IndexerProgressMonitor monitor;
+
+	// loading options
+	private final CacheMode cacheMode;
+	private final int objectLoadingBatchSize;
+
+	private final boolean purgeAtStart;
+	private final boolean optimizeAfterPurge;
+	private final boolean optimizeAtEnd;
+
+	public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor, SessionFactory sessionFactory,
+			Class<?> entityType,
+			int objectLoadingThreads, int collectionLoadingThreads, int writerThreads,
+			CacheMode cacheMode, int objectLoadingBatchSize,
+			boolean optimizeAtEnd, boolean purgeAtStart, boolean optimizeAfterPurge, CountDownLatch endAllSignal,
+			IndexerProgressMonitor monitor) {
+		
+		this.indexedType = entityType;
+		this.searchFactory = searchFactoryImplementor;
+		this.sessionFactory = sessionFactory;
+		
+		//thread pool sizing:
+		this.objectLoadingThreadNum = objectLoadingThreads;
+		this.luceneworkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded as needed by building the document
+//		this.indexWritingThreadNum = writerThreads; //FIXME enable this line and remove the next line after solving HSEARCH-367
+		this.indexWritingThreadNum = 1;
+		
+		//loading options:
+		this.cacheMode = cacheMode;
+		this.objectLoadingBatchSize = objectLoadingBatchSize;
+		
+		//executors: (quite expensive constructor)
+		//execIdentifiersLoader has size 1 and is not configurable: ensures the list is consistent as produced by one transaction
+		this.execIdentifiersLoader = Executors.newFixedThreadPool( 1, "identifierloader" );
+		this.execFirstLoader = Executors.newFixedThreadPool( objectLoadingThreadNum, "entityloader" );
+		this.execDocBuilding = Executors.newFixedThreadPool( luceneworkerBuildingThreadNum, "collectionsloader" );
+		this.execWriteIndex = Executors.newFixedThreadPool( indexWritingThreadNum, "analyzers" );
+		
+		//pipelining queues:
+		this.fromIdentifierListToEntities = new ProducerConsumerQueue( 1 );
+		this.fromEntityToAddwork = new ProducerConsumerQueue( objectLoadingThreadNum );
+		this.fromAddworkToIndex = new ProducerConsumerQueue( luceneworkerBuildingThreadNum );
+		
+		//end signal shared with other instances:
+		this.endAllSignal = endAllSignal;
+		this.endWritersSignal = new CountDownLatch( indexWritingThreadNum );
+		
+		//behaviour options:
+		this.optimizeAtEnd = optimizeAtEnd;
+		this.optimizeAfterPurge = optimizeAfterPurge;
+		this.purgeAtStart = purgeAtStart;
+		
+		this.monitor = monitor;
+	}
+
+	public void run() {
+		if ( ! started.compareAndSet( false, true ) )
+			throw new IllegalStateException( "BatchIndexingWorkspace can be started only once." );
+		boolean interrupted = false;
+		try {
+			//TODO switch to batch mode in backend
+			if ( purgeAtStart ) {
+				purgeAll();
+				if ( optimizeAfterPurge ) {
+					optimize();
+				}
+			}
+			
+			BackendQueueProcessorFactory backendQueueProcessorFactory = searchFactory.getBackendQueueProcessorFactory();
+			//first start the consumers, then the producers (reverse order):
+			for ( int i=0; i < indexWritingThreadNum; i++ ) {
+				//from LuceneWork to IndexWriters:
+				execWriteIndex.execute( new IndexWritingJob(
+						fromAddworkToIndex, endWritersSignal, monitor, backendQueueProcessorFactory ) );
+			}
+			for ( int i=0; i < luceneworkerBuildingThreadNum; i++ ) {
+				//from entity to LuceneWork:
+				execDocBuilding.execute( new EntityConsumerLuceneworkProducer(
+						fromEntityToAddwork, fromAddworkToIndex, monitor,
+						sessionFactory, searchFactory, cacheMode) );
+			}
+			for ( int i=0; i < objectLoadingThreadNum; i++ ) {
+				//from primary key to loaded entity:
+				execFirstLoader.execute( new IdentifierConsumerEntityProducer(
+						fromIdentifierListToEntities, fromEntityToAddwork, monitor,
+						sessionFactory, cacheMode, indexedType) );
+			}
+			
+			execIdentifiersLoader.execute( new IdentifierProducer(fromIdentifierListToEntities, sessionFactory, objectLoadingBatchSize, indexedType, monitor) );
+			//shutdown all executors:
+			execIdentifiersLoader.shutdown();
+			execFirstLoader.shutdown();
+			execDocBuilding.shutdown();
+			execWriteIndex.shutdown();
+			log.debug( "helper executors are shutting down" );
+			try {
+				endWritersSignal.await(); //await all indexing is done.
+			} catch (InterruptedException e) {
+				interrupted = true;
+				throw new SearchException( "Interrupted on batch Indexing; index will be left in unknown state!", e);
+			}
+			log.debug( "index writing finished" );
+			if ( optimizeAtEnd ) {
+				log.debug( "starting optimization" );
+				optimize();
+			}
+		}
+		finally {
+			endAllSignal.countDown();
+			if ( interrupted ) {
+				//restore interruption signal:
+				Thread.currentThread().interrupt();
+			}
+		}
+	}
+
+	private void optimize() {
+		Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic( new Class[] {indexedType} );
+		List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size() );
+		for ( Class<?> clazz : targetedClasses ) {
+			queue.add( new OptimizeLuceneWork( clazz ) );
+		}
+		//TODO use the batch backend
+		searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
+	}
+
+	private void purgeAll() {
+		Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic( new Class[] {indexedType} );
+		List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size() );
+		for ( Class<?> clazz : targetedClasses ) {
+			queue.add( new PurgeAllLuceneWork( clazz ) );
+		}
+		//TODO use the batch backend
+		searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
+	}
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,111 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.hibernate.CacheMode;
+import org.hibernate.FlushMode;
+import org.hibernate.Hibernate;
+import org.hibernate.LockMode;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.Transaction;
+import org.hibernate.search.backend.AddLuceneWork;
+import org.hibernate.search.bridge.TwoWayFieldBridge;
+import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Component of batch-indexing pipeline, using chained producer-consumers.
+ * This Runnable will consume entities taken one-by-one from the queue
+ * and produce for each entity an AddLuceneWork to the output queue.
+ * 
+ * @author Sanne Grinovero
+ */
+public class EntityConsumerLuceneworkProducer implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final ProducerConsumerQueue source;
+	private final ProducerConsumerQueue destination;
+	private final SessionFactory sessionFactory;
+	private final Map<Class<?>, DocumentBuilderIndexedEntity<?>> documentBuilders;
+	private final IndexerProgressMonitor monitor;
+	
+	private static final int CLEAR_PERIOD = 50;
+	private final CacheMode cacheMode;
+	
+	public EntityConsumerLuceneworkProducer(
+			ProducerConsumerQueue entitySource,
+			ProducerConsumerQueue fromAddworkToIndex,
+			IndexerProgressMonitor monitor,
+			SessionFactory sessionFactory,
+			SearchFactoryImplementor searchFactory, CacheMode cacheMode) {
+		this.source = entitySource;
+		this.destination = fromAddworkToIndex;
+		this.monitor = monitor;
+		this.sessionFactory = sessionFactory;
+		this.cacheMode = cacheMode;
+		this.documentBuilders = searchFactory.getDocumentBuildersIndexedEntities();
+	}
+
+	public void run() {
+		Session session = sessionFactory.openSession();
+		session.setFlushMode( FlushMode.MANUAL );
+		session.setCacheMode( cacheMode );
+		try {
+			Transaction transaction = session.beginTransaction();
+			indexAllQueue( session );
+			transaction.commit();
+		}
+		finally {
+			session.close();
+		}
+		log.debug( "finished" );
+	}
+
+	private void indexAllQueue(Session session) {
+		try {
+			for ( int cycle=0; true; cycle++ ) {
+				Object take = source.take();
+				if ( take == null ) {
+					break;
+				}
+				else {
+					log.trace( "received an object {}", take );
+					//trick to attach the objects to session:
+					session.lock( take, LockMode.NONE );
+					index( take, session );
+					monitor.documentsBuilt( 1 );
+					session.evict( take );
+					if ( cycle == CLEAR_PERIOD ) {
+						cycle = 0;
+						session.clear();
+					}
+				}
+			}
+		}
+		catch (InterruptedException e) {
+			// just quit
+		}
+		finally {
+			destination.producerStopping();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private void index( Object entity, Session session ) throws InterruptedException {
+		Serializable id = session.getIdentifier( entity );
+		Class clazz = Hibernate.getClass( entity );
+		DocumentBuilderIndexedEntity docBuilder = documentBuilders.get( clazz );
+		TwoWayFieldBridge idBridge = docBuilder.getIdBridge();
+		String idInString = idBridge.objectToString( id );
+		//depending on the complexity of the object graph going to be indexed it's possible
+		//that we hit the database several times during work construction.
+		AddLuceneWork addWork = docBuilder.createAddWork( clazz, entity, id, idInString, true );
+		destination.put( addWork );
+	}
+	
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,59 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Helper class to create threads;
+ * these threads are grouped and named to be readily identified in a profiler.
+ * 
+ * @author Sanne Grinovero
+ */
+public class Executors {
+	
+	private static final String THREAD_GROUP_PREFIX = "Hibernate Search indexer: ";
+	
+	/**
+	 * Creates a new fixed size ThreadPoolExecutor
+	 * @param threads
+	 * @param groupname
+	 * @return
+	 */
+	public static ThreadPoolExecutor newFixedThreadPool(int threads, String groupname) {
+		return new ThreadPoolExecutor(
+				threads,
+				threads,
+	            0L, TimeUnit.MILLISECONDS,
+	            new LinkedBlockingQueue<Runnable>(),
+	            new SearchThreadFactory( groupname ) );
+	}
+	
+	/**
+     * The thread factory, used to customize thread names
+     */
+    private static class SearchThreadFactory implements ThreadFactory {
+    	
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger( 1 );
+        final String namePrefix;
+
+        SearchThreadFactory(String groupname) {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null)? s.getThreadGroup() :
+                                 Thread.currentThread().getThreadGroup();
+            namePrefix = THREAD_GROUP_PREFIX + groupname + "-";
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r, 
+                                  namePrefix + threadNumber.getAndIncrement(),
+                                  0);
+            return t;
+        }
+        
+    }
+	
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,112 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.hibernate.CacheMode;
+import org.hibernate.Criteria;
+import org.hibernate.FlushMode;
+import org.hibernate.LockMode;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.Transaction;
+import org.hibernate.criterion.Restrictions;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is consuming entity identifiers and
+ * producing loaded detached entities for the next queue.
+ * It will finish when the queue it's consuming from will
+ * signal there are no more identifiers.
+ * 
+ * @author Sanne Grinovero
+ */
+public class IdentifierConsumerEntityProducer implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+
+	private final ProducerConsumerQueue source;
+	private final ProducerConsumerQueue destination;
+	private final SessionFactory sessionFactory;
+	private final CacheMode cacheMode;
+	private final Class<?> type;
+	private final IndexerProgressMonitor monitor;
+
+	public IdentifierConsumerEntityProducer(
+			ProducerConsumerQueue fromIdentifierListToEntities,
+			ProducerConsumerQueue fromEntityToAddwork,
+			IndexerProgressMonitor monitor,
+			SessionFactory sessionFactory,
+			CacheMode cacheMode, Class<?> type) {
+				this.source = fromIdentifierListToEntities;
+				this.destination = fromEntityToAddwork;
+				this.monitor = monitor;
+				this.sessionFactory = sessionFactory;
+				this.cacheMode = cacheMode;
+				this.type = type;
+				log.trace( "created" );
+	}
+
+	public void run() {
+		log.trace( "started" );
+		Session session = sessionFactory.openSession();
+		session.setFlushMode( FlushMode.MANUAL );
+		session.setCacheMode( cacheMode );
+		try {
+			Transaction transaction = session.beginTransaction();
+			loadAllFromQueue( session );
+			transaction.commit();
+		}
+		finally {
+			session.close();
+		}
+		log.trace( "finished" );
+	}
+	
+	private void loadAllFromQueue(Session session) {
+		try {
+			Object take;
+			do {
+				take = source.take();
+				if ( take != null ) {
+					List<Serializable> listIds = (List<Serializable>) take;
+					log.trace( "received list of ids {}", listIds );
+					loadList( listIds, session );
+				}
+			}
+			while ( take != null );
+		}
+		catch (InterruptedException e) {
+			// just quit
+		}
+		finally {
+			destination.producerStopping();
+		}
+	}
+
+	/**
+	 * Loads a list of entities of defined type using their identifiers.
+	 * The loaded objects are then pushed to the next queue one by one.
+	 * @param listIds the list of entity identifiers (of type
+	 * @param session the session to be used
+	 * @throws InterruptedException
+	 */
+	private void loadList(List<Serializable> listIds, Session session) throws InterruptedException {
+		//TODO investigate if I should use ObjectLoaderHelper.initializeObjects instead
+		Criteria criteria = session
+			.createCriteria( type )
+			.setCacheMode( cacheMode )
+			.setLockMode( LockMode.NONE )
+			.setCacheable( false )
+			.setFlushMode( FlushMode.MANUAL )
+			.add( Restrictions.in( "id", listIds ) );
+		List<?> list = criteria.list();
+		monitor.entitiesLoaded( list.size() );
+		session.clear();
+		for ( Object obj : list ) {
+			destination.put( obj );
+		}
+	}
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,125 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hibernate.Criteria;
+import org.hibernate.ScrollMode;
+import org.hibernate.ScrollableResults;
+import org.hibernate.SessionFactory;
+import org.hibernate.StatelessSession;
+import org.hibernate.Transaction;
+import org.hibernate.criterion.Projections;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is going to feed the indexing queue
+ * with the identifiers of all the entities going to be indexed.
+ * This step in the indexing process is not parallel (should be
+ * done by one thread per type) so that a single transaction is used
+ * to define the group of entities to be indexed.
+ * Produced identifiers are put in the destination queue grouped in List
+ * instances: the reason for this is to load them in batches
+ * in the next step and reduce contention on the queue.
+ * 
+ * @author Sanne Grinovero
+ */
+public class IdentifierProducer implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+
+	private final ProducerConsumerQueue destination;
+	private final SessionFactory sessionFactory;
+	private final int batchSize;
+	private final Class<?> indexedType;
+	private final IndexerProgressMonitor monitor;
+
+	/**
+	 * @param fromIdentifierListToEntities the target queue where the produced identifiers are sent to
+	 * @param sessionFactory the Hibernate SessionFactory to use to load entities
+	 * @param objectLoadingBatchSize affects mostly the next consumer: IdentifierConsumerEntityProducer
+	 * @param indexedType the entity type to be loaded
+	 * @param monitor to monitor indexing progress
+	 */
+	public IdentifierProducer(
+			ProducerConsumerQueue fromIdentifierListToEntities,
+			SessionFactory sessionFactory,
+			int objectLoadingBatchSize,
+			Class<?> indexedType, IndexerProgressMonitor monitor) {
+				this.destination = fromIdentifierListToEntities;
+				this.sessionFactory = sessionFactory;
+				this.batchSize = objectLoadingBatchSize;
+				this.indexedType = indexedType;
+				this.monitor = monitor;
+				log.trace( "created" );
+	}
+	
+	public void run() {
+		log.trace( "started" );
+		try {
+			inTransactionWrapper();
+		}
+		finally{
+			destination.producerStopping();
+		}
+		log.trace( "finished" );
+	}
+
+	private void inTransactionWrapper() {
+		StatelessSession session = sessionFactory.openStatelessSession();
+		try {
+			Transaction transaction = session.beginTransaction();
+			loadAllIdentifiers( session );
+			transaction.commit();
+		} catch (InterruptedException e) {
+			// just quit
+		}
+		finally {
+			session.close();
+		}
+	}
+
+	private void loadAllIdentifiers(final StatelessSession session) throws InterruptedException {
+		Integer count = (Integer) session
+			.createCriteria( indexedType )
+			.setProjection( Projections.count( "id" ) )
+			.setCacheable( false )
+			.uniqueResult();
+
+		log.debug( "going to fetch {} primary keys", count);
+		monitor.addToTotalCount( count );
+		
+		Criteria criteria = session
+			.createCriteria( indexedType )
+			.setProjection( Projections.id() )
+			.setCacheable( false )
+			.setFetchSize( 100 );
+		
+		ScrollableResults results = criteria.scroll( ScrollMode.FORWARD_ONLY );
+		ArrayList<Serializable> destinationList = new ArrayList<Serializable>( batchSize );
+		try {
+			while ( results.next() ) {
+				Serializable id = (Serializable) results.get( 0 );
+				destinationList.add( id );
+				if ( destinationList.size() == batchSize ) {
+					enqueueList( destinationList );
+					destinationList = new ArrayList<Serializable>( batchSize ); 
+				}
+			}
+		}
+		finally {
+			results.close();
+		}
+		enqueueList( destinationList );
+	}
+	
+	private void enqueueList(final List<Serializable> idsList) throws InterruptedException {
+		if ( ! idsList.isEmpty() ) {
+			destination.put( idsList );
+			log.trace( "produced a list of ids {}", idsList );
+		}
+	}
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,76 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is meant to push to the backend
+ * all LuceneWork it can take from the ProducerConsumerQueue.
+ * 
+ * FIXME:
+ * Is currently more an adaptor to bridge the queue approach
+ * to the current backend implementations. This is not by any means
+ * the most efficient way to index documents, it has been built only
+ * to test the approach.
+ * 
+ */
+class IndexWritingJob implements Runnable {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final ProducerConsumerQueue pleq;
+	private final BackendQueueProcessorFactory backend;
+	private final CountDownLatch endSignal;
+	private final IndexerProgressMonitor monitor;
+	
+	/**
+	 * @param pleq This queue contains the LuceneWork to be send to the backend
+	 * @param endSignal
+	 * @param monitor
+	 * @param backendQueueProcessorFactory
+	 */
+	IndexWritingJob(ProducerConsumerQueue pleq, CountDownLatch endSignal, IndexerProgressMonitor monitor, BackendQueueProcessorFactory backendQueueProcessorFactory) {
+		this.pleq = pleq;
+		this.monitor = monitor;
+		this.backend = backendQueueProcessorFactory;
+		this.endSignal = endSignal;
+		log.trace( "created" );
+	}
+
+	public void run() {
+		log.debug( "Start" );
+		try {
+			while ( true ) {
+				Object take = pleq.take();
+				if ( take == null ) {
+					break;
+				}
+				else {
+					LuceneWork work = (LuceneWork) take;
+					log.trace( "received lucenework {}", work );
+					//TODO group work in bigger lists of size #batch and introduce the CommitLuceneWork to avoid waiting more work
+					ArrayList<LuceneWork> list = new ArrayList<LuceneWork>( 1 );
+					list.add( work );
+					Runnable processor = backend.getProcessor( list );
+					processor.run();
+					monitor.documentsAdded( 1L );
+				}
+			}
+			log.debug( "Finished" );
+		}
+		catch (InterruptedException e) {
+			// normal quit: no need to propagate interruption.
+			log.debug( "Interrupted" );
+		}
+		finally {
+			//notify everybody we have finished.
+			endSignal.countDown();
+		}
+	}
+
+}
\ No newline at end of file

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,51 @@
+package org.hibernate.search.batchindexing;
+
+/**
+ * As an indexer can take some time to finish,
+ * an IndexerProgressMonitor can be defined in
+ * the configuration property
+ * hibernate.search.worker.indexing.monitor
+ * implementing this interface to track indexing
+ * performance.
+ * Implementors must be threadsafe and have a
+ * no-arg constructor.
+ * 
+ * @author Sanne Grinovero
+ */
+public interface IndexerProgressMonitor {
+
+	/**
+	 * The number of documents sent to the backend;
+	 * This is called several times during
+	 * the indexing process.
+	 * @param increment
+	 */
+	void documentsAdded(long increment);
+
+	/**
+	 * The number of Documents built;
+	 * This is called several times and concurrently during
+	 * the indexing process.
+	 * @param number
+	 */
+	void documentsBuilt(int number);
+
+	/**
+	 * The number of entities loaded from database;
+	 * This is called several times and concurrently during
+	 * the indexing process.
+	 * @param size
+	 */
+	void entitiesLoaded(int size);
+
+	/**
+	 * The total count of entities to be indexed is
+	 * added here; It could be called more than once,
+	 * the implementation should add them up.
+	 * This is called several times and concurrently during
+	 * the indexing process.
+	 * @param count
+	 */
+	void addToTotalCount(long count);
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,82 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements a blocking queue capable of storing
+ * a "poison" token to signal consumer threads
+ * that the task is finished.
+ * 
+ * @author Sanne Grinovero
+ */
+public class ProducerConsumerQueue {
+	
+	private static final int DEFAULT_BUFF_LENGHT = 1000;
+	private static final Object exitToken = new Object();
+	
+	private final BlockingQueue queue;
+	private final AtomicInteger producersToWaitFor;
+	
+	/**
+	 * @param producersToWaitFor The number of producer threads.
+	 */
+	public ProducerConsumerQueue( int producersToWaitFor ) {
+		this( DEFAULT_BUFF_LENGHT, producersToWaitFor );
+	}
+	
+	public ProducerConsumerQueue( int queueLenght, int producersToWaitFor ) {
+		queue = new ArrayBlockingQueue( queueLenght );
+		this.producersToWaitFor = new AtomicInteger( producersToWaitFor );
+	}
+	
+	/**
+	 * Blocks until an object is available; when null
+	 * is returned the client thread should quit.
+	 * @return the next object in the queue, or null to exit
+	 * @throws InterruptedException
+	 */
+	public Object take() throws InterruptedException {
+		Object obj = queue.take();
+		if ( obj == exitToken ) {
+			//restore exit signal for other threads
+			queue.put( exitToken );
+			return null;
+		}
+		else {
+			return obj;
+		}
+	}
+	
+	/**
+	 * Adds a new object to the queue, blocking if no space is
+	 * available.
+	 * @param obj
+	 * @throws InterruptedException
+	 */
+	public void put(Object obj) throws InterruptedException {
+		queue.put( obj );
+	}
+
+	/**
+	 * Each producer thread should call producerStopping() when it has
+	 * finished. After doing it can safely terminate.
+	 * After all producer threads have called producerStopping()
+	 * a token will be inserted in the blocking queue to eventually
+	 * awake sleaping consumers and have them quit, after the
+	 * queue has been processed.
+	 */
+	public void producerStopping() {
+		int activeProducers = producersToWaitFor.decrementAndGet();
+		//last producer must close consumers
+		if ( activeProducers == 0 ) {
+			try {
+				queue.put( exitToken );//awake all waiting threads to let them quit.
+			} catch (InterruptedException e) {
+				//just quit, consumers will be interrupted anyway if it's a shutdown.
+			}
+		}
+	}
+	
+}

Modified: search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java	2009-05-18 01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -105,7 +105,7 @@
 
 	/**
 	 * Flag indicating whether there is an explicit id (@DocumentId or @Id) or not. When Search is used as make
-	 * for example JBoss Cache searchable the <code>idKeywordName</code> wil be provided.
+	 * for example using JBoss Cache Searchable the <code>idKeywordName</code> will be provided.
 	 */
 	private boolean idProvided = false;
 
@@ -337,7 +337,7 @@
 		super.addWorkToQueue( entityClass, entity, id, workType, queue, searchFactoryImplementor );
 	}
 
-	private AddLuceneWork createAddWork(Class<T> entityClass, T entity, Serializable id, String idInString, boolean isBatch) {
+	public AddLuceneWork createAddWork(Class<T> entityClass, T entity, Serializable id, String idInString, boolean isBatch) {
 		Map<String, String> fieldToAnalyzerMap = new HashMap<String, String>();
 		Document doc = getDocument( entity, id, fieldToAnalyzerMap );
 		AddLuceneWork addWork;

Modified: search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java	2009-05-18 01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -46,6 +46,7 @@
 import org.hibernate.search.FullTextQuery;
 import org.hibernate.search.FullTextSession;
 import org.hibernate.search.SearchFactory;
+import org.hibernate.search.Indexer;
 import org.hibernate.search.backend.TransactionContext;
 import org.hibernate.search.backend.Work;
 import org.hibernate.search.backend.WorkType;
@@ -162,6 +163,15 @@
 		//another solution would be to subclass SessionImpl instead of having this LuceneSession delegation model
 		//this is an open discussion
 	}
+	
+	public Indexer createIndexer(Class<?>... types) {
+		if ( types.length == 0 ) {
+			return new IndexerImpl( getSearchFactoryImplementor(), getSessionFactory(), Object.class );
+		}
+		else {
+			return new IndexerImpl( getSearchFactoryImplementor(), getSessionFactory(), types );
+		}
+	}
 
 	public SearchFactory getSearchFactory() {
 		if ( searchFactory == null ) {

Added: search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,172 @@
+package org.hibernate.search.impl;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.CacheMode;
+import org.hibernate.SessionFactory;
+import org.hibernate.search.Indexer;
+import org.hibernate.search.batchindexing.BatchIndexingWorkspace;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Prepares and configures a BatchIndexingWorkspace to start rebuilding
+ * the indexes for all entities in the database.
+ * 
+ * @author Sanne Grinovero
+ */
+public class IndexerImpl implements Indexer {
+	
+	private static final Logger log = LoggerFactory.make();
+	
+	private final SearchFactoryImplementor searchFactoryImplementor;
+	private final SessionFactory sessionFactory;
+
+	protected Set<Class<?>> rootEntities = new HashSet<Class<?>>();
+	protected List<BatchIndexingWorkspace> indexers = new LinkedList<BatchIndexingWorkspace>();
+	boolean started = false; 
+	private CountDownLatch endAllSignal;
+	
+	// default settings defined here:
+	private int objectLoadingThreads = 2; //loading the main entity
+	private int collectionLoadingThreads = 4; //also responsible for loading of lazy @IndexedEmbedded collections
+	private int writerThreads = 1; //also running the Analyzers
+	private int objectLoadingBatchSize = 10;
+	private CacheMode cacheMode = CacheMode.IGNORE;
+	private boolean optimizeAtEnd = true;
+	private boolean purgeAtStart = true;
+	private boolean optimizeAfterPurge = true;
+	private IndexerProgressMonitor monitor = new SimpleIndexingProgressMonitor();
+
+	protected IndexerImpl(SearchFactoryImplementor searchFactory, SessionFactory sessionFactory, Class<?>...entities) {
+		this.searchFactoryImplementor = searchFactory;
+		this.sessionFactory = sessionFactory;
+		rootEntities = toRootEntities( searchFactoryImplementor, entities );
+	}
+
+	/**
+	 * From the set of classes a new set is built containing all indexed
+	 * subclasses, but removing then all subtypes of indexed entities.
+	 * @param selection
+	 * @return a new set of entities
+	 */
+	private static Set<Class<?>> toRootEntities(SearchFactoryImplementor searchFactoryImplementor, Class<?>... selection) {
+		Set<Class<?>> entities = new HashSet<Class<?>>();
+		//first build the "entities" set containing all indexed subtypes of "selection".
+		for (Class<?> entityType : selection) {
+			Set<Class<?>> targetedClasses = searchFactoryImplementor.getIndexedTypesPolymorphic( new Class[] {entityType} );
+			if ( targetedClasses.isEmpty() ) {
+				String msg = entityType.getName() + " is not an indexed entity or a subclass of an indexed entity";
+				throw new IllegalArgumentException( msg );
+			}
+			entities.addAll( targetedClasses );
+		}
+		Set<Class<?>> cleaned = new HashSet<Class<?>>();
+		Set<Class<?>> toRemove = new HashSet<Class<?>>();
+		//now remove all repeated types to avoid duplicate loading by polymorphic query loading
+		for (Class<?> type : entities) {
+			boolean typeIsOk = true;
+			for (Class<?> existing : cleaned) {
+				if ( existing.isAssignableFrom( type ) ) {
+					typeIsOk = false;
+					break;
+				}
+				if ( type.isAssignableFrom( existing ) ) {
+					toRemove.add( existing );
+				}
+			}
+			if ( typeIsOk ) {
+				cleaned.add( type );
+			}
+		}
+		cleaned.removeAll( toRemove );
+		if ( log.isDebugEnabled() ) {
+			log.debug( "Targets for indexing job: {}", cleaned );
+		}
+		return cleaned;
+	}
+
+	public Indexer cacheMode(CacheMode cacheMode) {
+		if ( cacheMode == null )
+			throw new IllegalArgumentException( "cacheMode must not be null" );
+		this.cacheMode = cacheMode;
+		return this;
+	}
+
+	public Indexer objectLoadingThreads(int numberOfThreads) {
+		if ( numberOfThreads < 1 )
+			throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+		this.objectLoadingThreads = numberOfThreads;
+		return this;
+	}
+	
+	public Indexer objectLoadingBatchSize(int batchSize) {
+		if ( batchSize < 1 )
+			throw new IllegalArgumentException( "batchSize must be at least 1" );
+		this.objectLoadingBatchSize = batchSize;
+		return this;
+	}
+	
+	public Indexer documentBuilderThreads(int numberOfThreads) {
+		if ( numberOfThreads < 1 )
+			throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+		this.collectionLoadingThreads = numberOfThreads;
+		return this;
+	}
+	
+	public Indexer indexWriterThreads(int numberOfThreads) {
+		if ( numberOfThreads < 1 )
+			throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+		this.writerThreads = numberOfThreads;
+		return this;
+	}
+	
+	public Indexer optimizeAtEnd(boolean optimize) {
+		this.optimizeAtEnd = optimize;
+		return this;
+	}
+
+	public Indexer optimizeAfterPurge(boolean optimize) {
+		this.optimizeAfterPurge = optimize;
+		return this;
+	}
+
+	public Indexer purgeAllAtStart(boolean purgeAll) {
+		this.purgeAtStart = purgeAll;
+		return this;
+	}
+	
+	public void start() {
+		if ( started ) {
+			throw new IllegalStateException( "Can be started only once" );
+		}
+		else {
+			started = true;
+			endAllSignal = new CountDownLatch( rootEntities.size() );
+			for ( Class<?> type : rootEntities ) {
+				indexers.add( new BatchIndexingWorkspace(
+						searchFactoryImplementor, sessionFactory, type,
+						objectLoadingThreads, collectionLoadingThreads, writerThreads,
+						cacheMode, objectLoadingBatchSize,
+						optimizeAtEnd, purgeAtStart, optimizeAfterPurge,
+						endAllSignal, monitor) );
+			}
+			for ( BatchIndexingWorkspace batcher : indexers ) {
+				new Thread( batcher ).start();
+			}
+		}
+	}
+
+	public boolean startAndWait(long timeout, TimeUnit unit) throws InterruptedException {
+		start();
+		return endAllSignal.await( timeout, unit );
+	}
+
+}

Added: search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java	                        (rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,56 @@
+package org.hibernate.search.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * A very simple implementation of IndexerProgressMonitor
+ * 
+ * @author Sanne Grinovero
+ */
+public class SimpleIndexingProgressMonitor implements IndexerProgressMonitor {
+	
+	private static final Logger log = LoggerFactory.make();
+	private final AtomicLong documentsDoneCounter = new AtomicLong();
+	private final AtomicLong totalCounter = new AtomicLong();
+	private volatile long startTimeMs;
+
+	public void entitiesLoaded(int size) {
+		//not used
+	}
+
+	public void documentsAdded(long increment) {
+		long current = documentsDoneCounter.addAndGet( increment );
+		if ( current == increment ) {
+			startTimeMs = System.currentTimeMillis();
+		}
+		if ( current % getStatusMessagePeriod() == 0 ) {
+			printStatusMessage( startTimeMs, totalCounter.get(), current );
+		}
+	}
+
+	public void documentsBuilt(int number) {
+		//not used
+	}
+
+	public void addToTotalCount(long count) {
+		totalCounter.addAndGet( count );
+		log.info( "Going to reindex {} entities", count );
+	}
+	
+	protected int getStatusMessagePeriod() {
+		return 1000;
+	}
+	
+	protected void printStatusMessage(long starttimems, long totalTodoCount, long doneCount) {
+		long elapsedMs = System.currentTimeMillis() - starttimems;
+		log.info( "{} documents indexed in {} ms", doneCount, elapsedMs );
+		double estimateSpeed = ( (double) doneCount * 1000f ) / elapsedMs ;
+		double estimatePercentileComplete = ( (double) doneCount*100 ) / (double)totalTodoCount ;
+		log.info( "Indexing speed: {} documents/second; progress: {}%", estimateSpeed, estimatePercentileComplete );
+	}
+
+}

Modified: search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java	2009-05-18 01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -4,6 +4,7 @@
 import java.io.Serializable;
 import javax.persistence.EntityManager;
 
+import org.hibernate.search.Indexer;
 import org.hibernate.search.SearchFactory;
 
 /**
@@ -68,5 +69,14 @@
 	 * Flush all index changes forcing Hibernate Search to apply all changes to the index not waiting for the batch limit.
 	 */
 	public void flushToIndexes();
+	
+	/**
+	 * Creates an Indexer to rebuild the indexes of some
+	 * or all indexed entity types.
+	 * Instances cannot be reused.
+	 * @param types optionally restrict the operation to selected types
+	 * @return
+	 */
+	public Indexer createIndexer(Class<?>... types);
 
 }

Modified: search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java	2009-05-18 01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -10,6 +10,7 @@
 
 import org.hibernate.search.jpa.FullTextEntityManager;
 import org.hibernate.search.jpa.FullTextQuery;
+import org.hibernate.search.Indexer;
 import org.hibernate.search.SearchFactory;
 import org.hibernate.search.SearchException;
 import org.hibernate.search.FullTextSession;
@@ -176,4 +177,9 @@
 	public EntityTransaction getTransaction() {
 		return em.getTransaction();
 	}
+	
+	public Indexer createIndexer(Class<?>... types) {
+		return getFullTextSession().createIndexer( types );
+	}
+	
 }

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,13 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+
+import org.hibernate.search.annotations.Indexed;
+
+ at Entity
+ at Indexed
+public class AncientBook extends Book {
+	
+	public String catalogueGroupName = "";
+
+}

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,15 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+import org.hibernate.search.annotations.Indexed;
+
+ at Indexed
+ at Entity
+public class Book {
+	
+	@Id
+	public long id;
+
+}

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,36 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Indexed;
+
+ at Indexed
+ at Entity
+public class Dvd {
+	
+	public long unusuallyNamedIdentifier;
+	public String title;
+
+	@Id
+	@GeneratedValue
+	public long getUnusuallyNamedIdentifier() {
+		return unusuallyNamedIdentifier;
+	}
+
+	public void setUnusuallyNamedIdentifier(long unusuallyNamedIdentifier) {
+		this.unusuallyNamedIdentifier = unusuallyNamedIdentifier;
+	}
+
+	@Field
+	public String getTitle() {
+		return title;
+	}
+	
+	public void setTitle(String title) {
+		this.title = title;
+	}
+	
+}

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,13 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+
+import org.hibernate.search.annotations.Indexed;
+
+ at Entity
+ at Indexed
+public class ModernBook extends Book {
+	
+	public String isbn = null;
+
+}

Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java	                        (rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java	2009-05-18 01:59:10 UTC (rev 16580)
@@ -0,0 +1,118 @@
+package org.hibernate.search.test.batchindexing;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.hibernate.Transaction;
+import org.hibernate.search.Environment;
+import org.hibernate.search.FullTextQuery;
+import org.hibernate.search.FullTextSession;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.impl.IndexerImpl;
+import org.hibernate.search.test.util.FullTextSessionBuilder;
+
+public class SearchIndexerTest extends TestCase {
+	
+	public void testEntityHierarchy() {
+		FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
+			.addAnnotatedClass( ModernBook.class )
+			.addAnnotatedClass( AncientBook.class )
+			.addAnnotatedClass( Dvd.class )
+			.addAnnotatedClass( Book.class )
+			.build();
+		FullTextSession fullTextSession = ftsb.openFullTextSession();
+		SearchFactoryImplementor searchFactory = (SearchFactoryImplementor) fullTextSession.getSearchFactory();
+		{
+			TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory, Book.class );
+			assertTrue( tsii.getRootEntities().contains( Book.class ) );
+			assertFalse( tsii.getRootEntities().contains( ModernBook.class ) );
+			assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+		}
+		{
+			TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory, ModernBook.class, AncientBook.class, Book.class );
+			assertTrue( tsii.getRootEntities().contains( Book.class ) );
+			assertFalse( tsii.getRootEntities().contains( ModernBook.class ) );
+			assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+		}
+		{
+			TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory, ModernBook.class, AncientBook.class );
+			assertFalse( tsii.getRootEntities().contains( Book.class ) );
+			assertTrue( tsii.getRootEntities().contains( ModernBook.class ) );
+			assertTrue( tsii.getRootEntities().contains( AncientBook.class ) );
+		}
+		//verify that indexing Object will result in one separate indexer working per root indexed entity
+		{
+			TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory, Object.class );
+			assertTrue( tsii.getRootEntities().contains( Book.class ) );
+			assertTrue( tsii.getRootEntities().contains( Dvd.class ) );
+			assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+			assertFalse( tsii.getRootEntities().contains( Object.class ) );
+			assertEquals( 2, tsii.getRootEntities().size() );
+		}
+	}
+	
+	private class TestableSearchIndexerImpl extends IndexerImpl {
+		
+		protected TestableSearchIndexerImpl(SearchFactoryImplementor searchFactory, Class<?>... types) {
+			super( searchFactory, null, types );
+		}
+
+		public Set<Class<?>> getRootEntities() {
+			return this.rootEntities;
+		}
+		
+	}
+	
+	public void testIdentifierNaming() throws InterruptedException {
+		//disable automatic indexing, to test manual index creation.
+		FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
+			.setProperty( org.hibernate.search.Environment.ANALYZER_CLASS, StandardAnalyzer.class.getName() )
+			.addAnnotatedClass( Dvd.class )
+			.setProperty( Environment.INDEXING_STRATEGY, "manual" )
+			.build();
+		{
+			//creating the test data in database only:
+			FullTextSession fullTextSession = ftsb.openFullTextSession();
+			Transaction transaction = fullTextSession.beginTransaction();
+			Dvd dvda = new Dvd();
+			dvda.setTitle( "Star Trek (episode 96367)" );
+			fullTextSession.save(dvda);
+			Dvd dvdb = new Dvd();
+			dvdb.setTitle( "The Trek" );
+			fullTextSession.save(dvdb);
+			transaction.commit();
+			fullTextSession.close();
+		}
+		{	
+			//verify index is still empty:
+			assertEquals( 0, countResults( new Term( "title", "trek" ), ftsb, Dvd.class ) );
+		}
+		{
+			FullTextSession fullTextSession = ftsb.openFullTextSession();
+			fullTextSession.createIndexer( Dvd.class )
+				.startAndWait( 10, TimeUnit.SECONDS );
+			fullTextSession.close();
+		}
+		{	
+			//verify index is now containing both DVDs:
+			assertEquals( 2, countResults( new Term( "title", "trek" ), ftsb, Dvd.class ) );
+		}
+	}
+	
+	public int countResults( Term termForQuery, FullTextSessionBuilder ftSessionBuilder, Class<?> type ) {
+		TermQuery fullTextQuery = new TermQuery( new Term( "title", "trek" ) );
+		FullTextSession fullTextSession = ftSessionBuilder.openFullTextSession();
+		Transaction transaction = fullTextSession.beginTransaction();
+		FullTextQuery query = fullTextSession.createFullTextQuery( fullTextQuery, type );
+		int resultSize = query.getResultSize();
+		transaction.commit();
+		fullTextSession.close();
+		return resultSize;
+	}
+
+}




More information about the hibernate-commits mailing list