Author: sannegrinovero
Date: 2009-05-17 21:59:10 -0400 (Sun, 17 May 2009)
New Revision: 16580
Added:
search/trunk/src/main/java/org/hibernate/search/Indexer.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/
search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java
search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
Modified:
search/trunk/src/main/java/org/hibernate/search/FullTextSession.java
search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java
search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java
search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java
search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java
Log:
HSEARCH-218 loading of entities in parallel sessions (multithreaded) to improve index
rebuilding performance. Useless until integrated with a new ad-hoc backend.
Modified: search/trunk/src/main/java/org/hibernate/search/FullTextSession.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/FullTextSession.java 2009-05-18
01:44:29 UTC (rev 16579)
+++ search/trunk/src/main/java/org/hibernate/search/FullTextSession.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -67,4 +67,14 @@
* Flush all index changes forcing Hibernate Search to apply all changes to the index
not waiting for the batch limit.
*/
public void flushToIndexes();
+
+ /**
+ * Creates an Indexer to rebuild the indexes of some
+ * or all indexed entity types.
+ * Instances cannot be reused.
+ * @param types optionally restrict the operation to selected types
+ * @return
+ */
+ public Indexer createIndexer(Class<?>... types);
+
}
Added: search/trunk/src/main/java/org/hibernate/search/Indexer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/Indexer.java
(rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/Indexer.java 2009-05-18 01:59:10 UTC
(rev 16580)
@@ -0,0 +1,88 @@
+package org.hibernate.search;
+
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.CacheMode;
+
+public interface Indexer {
+
+ /**
+ * Set the number of threads to be used to load
+ * the root entities.
+ * @param numberOfThreads
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer objectLoadingThreads(int numberOfThreads);
+
+ /**
+ * Sets the batch size used to load the root entities.
+ * @param batchSize
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer objectLoadingBatchSize(int batchSize);
+
+ /**
+ * Sets the number of threads used to load the lazy collections
+ * related to the indexed entities.
+ * @param numberOfThreads
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer documentBuilderThreads(int numberOfThreads);
+
+ //not supported yet
+ //Indexer indexWriterThreads(int numberOfThreads);
+
+ /**
+ * Sets the cache interaction mode for the data loading tasks.
+ * Defaults to <tt>CacheMode.IGNORE</tt>.
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer cacheMode(CacheMode cacheMode);
+
+ /**
+ * If index optimization has to be started at the end
+ * of the indexing process.
+ * Defaults to <tt>true</tt>.
+ * @param optimize
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer optimizeAtEnd(boolean optimize);
+
+ /**
+ * If index optimization should be run before starting,
+ * after the purgeAll. Has no effect if <tt>purgeAll</tt> is set to false.
+ * Defaults to <tt>true</tt>.
+ * @param optimize
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer optimizeAfterPurge(boolean optimize);
+
+ /**
+ * If all entities should be removed from the index before starting
+ * using purgeAll. Set it only to false if you know there are no
+ * entities in the index: otherwise search results may be duplicated.
+ * Defaults to true.
+ * @param purgeAll
+ * @return <tt>this</tt> for method chaining
+ */
+ Indexer purgeAllAtStart(boolean purgeAll);
+
+ /**
+ * Starts the indexing process in background (asynchronous).
+ * Can be called only once.
+ */
+ void start();
+
+ /**
+ * Starts the indexing process, and then block until it's finished.
+ * Can be called only once.
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the <tt>timeout</tt> argument.
+ * @return <tt>true</tt> if the process finished and
<tt>false</tt>
+ * if the waiting time elapsed before the process was finished.
+ * @throws InterruptedException if the current thread is interrupted
+ * while waiting.
+ */
+ boolean startAndWait( long timeout, TimeUnit unit ) throws InterruptedException;
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/BatchIndexingWorkspace.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,190 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.hibernate.CacheMode;
+import org.hibernate.SessionFactory;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.backend.OptimizeLuceneWork;
+import org.hibernate.search.backend.PurgeAllLuceneWork;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This runnable will prepare a pipeline for batch indexing
+ * of entities, managing the lifecycle of several ThreadPools.
+ *
+ * @author Sanne Grinovero
+ */
+public class BatchIndexingWorkspace implements Runnable {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final SearchFactoryImplementor searchFactory;
+ private final SessionFactory sessionFactory;
+
+ //following order shows the 4 stages of an entity flowing to the index:
+ private final ThreadPoolExecutor execIdentifiersLoader;
+ private final ProducerConsumerQueue fromIdentifierListToEntities;
+ private final ThreadPoolExecutor execFirstLoader;
+ private final ProducerConsumerQueue fromEntityToAddwork;
+ private final ThreadPoolExecutor execDocBuilding;
+ private final ProducerConsumerQueue fromAddworkToIndex;
+ private final ThreadPoolExecutor execWriteIndex;
+
+ private final int objectLoadingThreadNum;
+ private final int luceneworkerBuildingThreadNum;
+ private final int indexWritingThreadNum;
+ private final Class<?> indexedType;
+
+ // status control
+ private final AtomicBoolean started = new AtomicBoolean( false );
+ private final CountDownLatch endWritersSignal; //released when we stop adding Documents
to Index
+ private final CountDownLatch endAllSignal; //released when we release all locks and
IndexWriter
+
+ // progress monitor
+ private final IndexerProgressMonitor monitor;
+
+ // loading options
+ private final CacheMode cacheMode;
+ private final int objectLoadingBatchSize;
+
+ private final boolean purgeAtStart;
+ private final boolean optimizeAfterPurge;
+ private final boolean optimizeAtEnd;
+
+ public BatchIndexingWorkspace(SearchFactoryImplementor searchFactoryImplementor,
SessionFactory sessionFactory,
+ Class<?> entityType,
+ int objectLoadingThreads, int collectionLoadingThreads, int writerThreads,
+ CacheMode cacheMode, int objectLoadingBatchSize,
+ boolean optimizeAtEnd, boolean purgeAtStart, boolean optimizeAfterPurge,
CountDownLatch endAllSignal,
+ IndexerProgressMonitor monitor) {
+
+ this.indexedType = entityType;
+ this.searchFactory = searchFactoryImplementor;
+ this.sessionFactory = sessionFactory;
+
+ //thread pool sizing:
+ this.objectLoadingThreadNum = objectLoadingThreads;
+ this.luceneworkerBuildingThreadNum = collectionLoadingThreads;//collections are loaded
as needed by building the document
+// this.indexWritingThreadNum = writerThreads; //FIXME enable this line and remove the
next line after solving HSEARCH-367
+ this.indexWritingThreadNum = 1;
+
+ //loading options:
+ this.cacheMode = cacheMode;
+ this.objectLoadingBatchSize = objectLoadingBatchSize;
+
+ //executors: (quite expensive constructor)
+ //execIdentifiersLoader has size 1 and is not configurable: ensures the list is
consistent as produced by one transaction
+ this.execIdentifiersLoader = Executors.newFixedThreadPool( 1,
"identifierloader" );
+ this.execFirstLoader = Executors.newFixedThreadPool( objectLoadingThreadNum,
"entityloader" );
+ this.execDocBuilding = Executors.newFixedThreadPool( luceneworkerBuildingThreadNum,
"collectionsloader" );
+ this.execWriteIndex = Executors.newFixedThreadPool( indexWritingThreadNum,
"analyzers" );
+
+ //pipelining queues:
+ this.fromIdentifierListToEntities = new ProducerConsumerQueue( 1 );
+ this.fromEntityToAddwork = new ProducerConsumerQueue( objectLoadingThreadNum );
+ this.fromAddworkToIndex = new ProducerConsumerQueue( luceneworkerBuildingThreadNum );
+
+ //end signal shared with other instances:
+ this.endAllSignal = endAllSignal;
+ this.endWritersSignal = new CountDownLatch( indexWritingThreadNum );
+
+ //behaviour options:
+ this.optimizeAtEnd = optimizeAtEnd;
+ this.optimizeAfterPurge = optimizeAfterPurge;
+ this.purgeAtStart = purgeAtStart;
+
+ this.monitor = monitor;
+ }
+
+ public void run() {
+ if ( ! started.compareAndSet( false, true ) )
+ throw new IllegalStateException( "BatchIndexingWorkspace can be started only
once." );
+ boolean interrupted = false;
+ try {
+ //TODO switch to batch mode in backend
+ if ( purgeAtStart ) {
+ purgeAll();
+ if ( optimizeAfterPurge ) {
+ optimize();
+ }
+ }
+
+ BackendQueueProcessorFactory backendQueueProcessorFactory =
searchFactory.getBackendQueueProcessorFactory();
+ //first start the consumers, then the producers (reverse order):
+ for ( int i=0; i < indexWritingThreadNum; i++ ) {
+ //from LuceneWork to IndexWriters:
+ execWriteIndex.execute( new IndexWritingJob(
+ fromAddworkToIndex, endWritersSignal, monitor, backendQueueProcessorFactory ) );
+ }
+ for ( int i=0; i < luceneworkerBuildingThreadNum; i++ ) {
+ //from entity to LuceneWork:
+ execDocBuilding.execute( new EntityConsumerLuceneworkProducer(
+ fromEntityToAddwork, fromAddworkToIndex, monitor,
+ sessionFactory, searchFactory, cacheMode) );
+ }
+ for ( int i=0; i < objectLoadingThreadNum; i++ ) {
+ //from primary key to loaded entity:
+ execFirstLoader.execute( new IdentifierConsumerEntityProducer(
+ fromIdentifierListToEntities, fromEntityToAddwork, monitor,
+ sessionFactory, cacheMode, indexedType) );
+ }
+
+ execIdentifiersLoader.execute( new IdentifierProducer(fromIdentifierListToEntities,
sessionFactory, objectLoadingBatchSize, indexedType, monitor) );
+ //shutdown all executors:
+ execIdentifiersLoader.shutdown();
+ execFirstLoader.shutdown();
+ execDocBuilding.shutdown();
+ execWriteIndex.shutdown();
+ log.debug( "helper executors are shutting down" );
+ try {
+ endWritersSignal.await(); //await all indexing is done.
+ } catch (InterruptedException e) {
+ interrupted = true;
+ throw new SearchException( "Interrupted on batch Indexing; index will be left in
unknown state!", e);
+ }
+ log.debug( "index writing finished" );
+ if ( optimizeAtEnd ) {
+ log.debug( "starting optimization" );
+ optimize();
+ }
+ }
+ finally {
+ endAllSignal.countDown();
+ if ( interrupted ) {
+ //restore interruption signal:
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void optimize() {
+ Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic(
new Class[] {indexedType} );
+ List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size()
);
+ for ( Class<?> clazz : targetedClasses ) {
+ queue.add( new OptimizeLuceneWork( clazz ) );
+ }
+ //TODO use the batch backend
+ searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
+ }
+
+ private void purgeAll() {
+ Set<Class<?>> targetedClasses = searchFactory.getIndexedTypesPolymorphic(
new Class[] {indexedType} );
+ List<LuceneWork> queue = new ArrayList<LuceneWork>( targetedClasses.size()
);
+ for ( Class<?> clazz : targetedClasses ) {
+ queue.add( new PurgeAllLuceneWork( clazz ) );
+ }
+ //TODO use the batch backend
+ searchFactory.getBackendQueueProcessorFactory().getProcessor( queue ).run();
+ }
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/EntityConsumerLuceneworkProducer.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,111 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.hibernate.CacheMode;
+import org.hibernate.FlushMode;
+import org.hibernate.Hibernate;
+import org.hibernate.LockMode;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.Transaction;
+import org.hibernate.search.backend.AddLuceneWork;
+import org.hibernate.search.bridge.TwoWayFieldBridge;
+import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Component of batch-indexing pipeline, using chained producer-consumers.
+ * This Runnable will consume entities taken one-by-one from the queue
+ * and produce for each entity an AddLuceneWork to the output queue.
+ *
+ * @author Sanne Grinovero
+ */
+public class EntityConsumerLuceneworkProducer implements Runnable {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final ProducerConsumerQueue source;
+ private final ProducerConsumerQueue destination;
+ private final SessionFactory sessionFactory;
+ private final Map<Class<?>, DocumentBuilderIndexedEntity<?>>
documentBuilders;
+ private final IndexerProgressMonitor monitor;
+
+ private static final int CLEAR_PERIOD = 50;
+ private final CacheMode cacheMode;
+
+ public EntityConsumerLuceneworkProducer(
+ ProducerConsumerQueue entitySource,
+ ProducerConsumerQueue fromAddworkToIndex,
+ IndexerProgressMonitor monitor,
+ SessionFactory sessionFactory,
+ SearchFactoryImplementor searchFactory, CacheMode cacheMode) {
+ this.source = entitySource;
+ this.destination = fromAddworkToIndex;
+ this.monitor = monitor;
+ this.sessionFactory = sessionFactory;
+ this.cacheMode = cacheMode;
+ this.documentBuilders = searchFactory.getDocumentBuildersIndexedEntities();
+ }
+
+ public void run() {
+ Session session = sessionFactory.openSession();
+ session.setFlushMode( FlushMode.MANUAL );
+ session.setCacheMode( cacheMode );
+ try {
+ Transaction transaction = session.beginTransaction();
+ indexAllQueue( session );
+ transaction.commit();
+ }
+ finally {
+ session.close();
+ }
+ log.debug( "finished" );
+ }
+
+ private void indexAllQueue(Session session) {
+ try {
+ for ( int cycle=0; true; cycle++ ) {
+ Object take = source.take();
+ if ( take == null ) {
+ break;
+ }
+ else {
+ log.trace( "received an object {}", take );
+ //trick to attach the objects to session:
+ session.lock( take, LockMode.NONE );
+ index( take, session );
+ monitor.documentsBuilt( 1 );
+ session.evict( take );
+ if ( cycle == CLEAR_PERIOD ) {
+ cycle = 0;
+ session.clear();
+ }
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ // just quit
+ }
+ finally {
+ destination.producerStopping();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void index( Object entity, Session session ) throws InterruptedException {
+ Serializable id = session.getIdentifier( entity );
+ Class clazz = Hibernate.getClass( entity );
+ DocumentBuilderIndexedEntity docBuilder = documentBuilders.get( clazz );
+ TwoWayFieldBridge idBridge = docBuilder.getIdBridge();
+ String idInString = idBridge.objectToString( id );
+ //depending on the complexity of the object graph going to be indexed it's
possible
+ //that we hit the database several times during work construction.
+ AddLuceneWork addWork = docBuilder.createAddWork( clazz, entity, id, idInString, true
);
+ destination.put( addWork );
+ }
+
+}
Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/Executors.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,59 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Helper class to create threads;
+ * these threads are grouped and named to be readily identified in a profiler.
+ *
+ * @author Sanne Grinovero
+ */
+public class Executors {
+
+ private static final String THREAD_GROUP_PREFIX = "Hibernate Search indexer:
";
+
+ /**
+ * Creates a new fixed size ThreadPoolExecutor
+ * @param threads
+ * @param groupname
+ * @return
+ */
+ public static ThreadPoolExecutor newFixedThreadPool(int threads, String groupname) {
+ return new ThreadPoolExecutor(
+ threads,
+ threads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new SearchThreadFactory( groupname ) );
+ }
+
+ /**
+ * The thread factory, used to customize thread names
+ */
+ private static class SearchThreadFactory implements ThreadFactory {
+
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger( 1 );
+ final String namePrefix;
+
+ SearchThreadFactory(String groupname) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null)? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+ namePrefix = THREAD_GROUP_PREFIX + groupname + "-";
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r,
+ namePrefix + threadNumber.getAndIncrement(),
+ 0);
+ return t;
+ }
+
+ }
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierConsumerEntityProducer.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,112 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.hibernate.CacheMode;
+import org.hibernate.Criteria;
+import org.hibernate.FlushMode;
+import org.hibernate.LockMode;
+import org.hibernate.Session;
+import org.hibernate.SessionFactory;
+import org.hibernate.Transaction;
+import org.hibernate.criterion.Restrictions;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is consuming entity identifiers and
+ * producing loaded detached entities for the next queue.
+ * It will finish when the queue it's consuming from will
+ * signal there are no more identifiers.
+ *
+ * @author Sanne Grinovero
+ */
+public class IdentifierConsumerEntityProducer implements Runnable {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final ProducerConsumerQueue source;
+ private final ProducerConsumerQueue destination;
+ private final SessionFactory sessionFactory;
+ private final CacheMode cacheMode;
+ private final Class<?> type;
+ private final IndexerProgressMonitor monitor;
+
+ public IdentifierConsumerEntityProducer(
+ ProducerConsumerQueue fromIdentifierListToEntities,
+ ProducerConsumerQueue fromEntityToAddwork,
+ IndexerProgressMonitor monitor,
+ SessionFactory sessionFactory,
+ CacheMode cacheMode, Class<?> type) {
+ this.source = fromIdentifierListToEntities;
+ this.destination = fromEntityToAddwork;
+ this.monitor = monitor;
+ this.sessionFactory = sessionFactory;
+ this.cacheMode = cacheMode;
+ this.type = type;
+ log.trace( "created" );
+ }
+
+ public void run() {
+ log.trace( "started" );
+ Session session = sessionFactory.openSession();
+ session.setFlushMode( FlushMode.MANUAL );
+ session.setCacheMode( cacheMode );
+ try {
+ Transaction transaction = session.beginTransaction();
+ loadAllFromQueue( session );
+ transaction.commit();
+ }
+ finally {
+ session.close();
+ }
+ log.trace( "finished" );
+ }
+
+ private void loadAllFromQueue(Session session) {
+ try {
+ Object take;
+ do {
+ take = source.take();
+ if ( take != null ) {
+ List<Serializable> listIds = (List<Serializable>) take;
+ log.trace( "received list of ids {}", listIds );
+ loadList( listIds, session );
+ }
+ }
+ while ( take != null );
+ }
+ catch (InterruptedException e) {
+ // just quit
+ }
+ finally {
+ destination.producerStopping();
+ }
+ }
+
+ /**
+ * Loads a list of entities of defined type using their identifiers.
+ * The loaded objects are then pushed to the next queue one by one.
+ * @param listIds the list of entity identifiers (of type
+ * @param session the session to be used
+ * @throws InterruptedException
+ */
+ private void loadList(List<Serializable> listIds, Session session) throws
InterruptedException {
+ //TODO investigate if I should use ObjectLoaderHelper.initializeObjects instead
+ Criteria criteria = session
+ .createCriteria( type )
+ .setCacheMode( cacheMode )
+ .setLockMode( LockMode.NONE )
+ .setCacheable( false )
+ .setFlushMode( FlushMode.MANUAL )
+ .add( Restrictions.in( "id", listIds ) );
+ List<?> list = criteria.list();
+ monitor.entitiesLoaded( list.size() );
+ session.clear();
+ for ( Object obj : list ) {
+ destination.put( obj );
+ }
+ }
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/IdentifierProducer.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,125 @@
+package org.hibernate.search.batchindexing;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hibernate.Criteria;
+import org.hibernate.ScrollMode;
+import org.hibernate.ScrollableResults;
+import org.hibernate.SessionFactory;
+import org.hibernate.StatelessSession;
+import org.hibernate.Transaction;
+import org.hibernate.criterion.Projections;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is going to feed the indexing queue
+ * with the identifiers of all the entities going to be indexed.
+ * This step in the indexing process is not parallel (should be
+ * done by one thread per type) so that a single transaction is used
+ * to define the group of entities to be indexed.
+ * Produced identifiers are put in the destination queue grouped in List
+ * instances: the reason for this is to load them in batches
+ * in the next step and reduce contention on the queue.
+ *
+ * @author Sanne Grinovero
+ */
+public class IdentifierProducer implements Runnable {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final ProducerConsumerQueue destination;
+ private final SessionFactory sessionFactory;
+ private final int batchSize;
+ private final Class<?> indexedType;
+ private final IndexerProgressMonitor monitor;
+
+ /**
+ * @param fromIdentifierListToEntities the target queue where the produced identifiers
are sent to
+ * @param sessionFactory the Hibernate SessionFactory to use to load entities
+ * @param objectLoadingBatchSize affects mostly the next consumer:
IdentifierConsumerEntityProducer
+ * @param indexedType the entity type to be loaded
+ * @param monitor to monitor indexing progress
+ */
+ public IdentifierProducer(
+ ProducerConsumerQueue fromIdentifierListToEntities,
+ SessionFactory sessionFactory,
+ int objectLoadingBatchSize,
+ Class<?> indexedType, IndexerProgressMonitor monitor) {
+ this.destination = fromIdentifierListToEntities;
+ this.sessionFactory = sessionFactory;
+ this.batchSize = objectLoadingBatchSize;
+ this.indexedType = indexedType;
+ this.monitor = monitor;
+ log.trace( "created" );
+ }
+
+ public void run() {
+ log.trace( "started" );
+ try {
+ inTransactionWrapper();
+ }
+ finally{
+ destination.producerStopping();
+ }
+ log.trace( "finished" );
+ }
+
+ private void inTransactionWrapper() {
+ StatelessSession session = sessionFactory.openStatelessSession();
+ try {
+ Transaction transaction = session.beginTransaction();
+ loadAllIdentifiers( session );
+ transaction.commit();
+ } catch (InterruptedException e) {
+ // just quit
+ }
+ finally {
+ session.close();
+ }
+ }
+
+ private void loadAllIdentifiers(final StatelessSession session) throws
InterruptedException {
+ Integer count = (Integer) session
+ .createCriteria( indexedType )
+ .setProjection( Projections.count( "id" ) )
+ .setCacheable( false )
+ .uniqueResult();
+
+ log.debug( "going to fetch {} primary keys", count);
+ monitor.addToTotalCount( count );
+
+ Criteria criteria = session
+ .createCriteria( indexedType )
+ .setProjection( Projections.id() )
+ .setCacheable( false )
+ .setFetchSize( 100 );
+
+ ScrollableResults results = criteria.scroll( ScrollMode.FORWARD_ONLY );
+ ArrayList<Serializable> destinationList = new ArrayList<Serializable>(
batchSize );
+ try {
+ while ( results.next() ) {
+ Serializable id = (Serializable) results.get( 0 );
+ destinationList.add( id );
+ if ( destinationList.size() == batchSize ) {
+ enqueueList( destinationList );
+ destinationList = new ArrayList<Serializable>( batchSize );
+ }
+ }
+ }
+ finally {
+ results.close();
+ }
+ enqueueList( destinationList );
+ }
+
+ private void enqueueList(final List<Serializable> idsList) throws
InterruptedException {
+ if ( ! idsList.isEmpty() ) {
+ destination.put( idsList );
+ log.trace( "produced a list of ids {}", idsList );
+ }
+ }
+
+}
Added: search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexWritingJob.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,76 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.LuceneWork;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This Runnable is meant to push to the backend
+ * all LuceneWork it can take from the ProducerConsumerQueue.
+ *
+ * FIXME:
+ * Is currently more an adaptor to bridge the queue approach
+ * to the current backend implementations. This is not by any means
+ * the most efficient way to index documents, it has been built only
+ * to test the approach.
+ *
+ */
+class IndexWritingJob implements Runnable {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final ProducerConsumerQueue pleq;
+ private final BackendQueueProcessorFactory backend;
+ private final CountDownLatch endSignal;
+ private final IndexerProgressMonitor monitor;
+
+ /**
+ * @param pleq This queue contains the LuceneWork to be send to the backend
+ * @param endSignal
+ * @param monitor
+ * @param backendQueueProcessorFactory
+ */
+ IndexWritingJob(ProducerConsumerQueue pleq, CountDownLatch endSignal,
IndexerProgressMonitor monitor, BackendQueueProcessorFactory backendQueueProcessorFactory)
{
+ this.pleq = pleq;
+ this.monitor = monitor;
+ this.backend = backendQueueProcessorFactory;
+ this.endSignal = endSignal;
+ log.trace( "created" );
+ }
+
+ public void run() {
+ log.debug( "Start" );
+ try {
+ while ( true ) {
+ Object take = pleq.take();
+ if ( take == null ) {
+ break;
+ }
+ else {
+ LuceneWork work = (LuceneWork) take;
+ log.trace( "received lucenework {}", work );
+ //TODO group work in bigger lists of size #batch and introduce the CommitLuceneWork
to avoid waiting more work
+ ArrayList<LuceneWork> list = new ArrayList<LuceneWork>( 1 );
+ list.add( work );
+ Runnable processor = backend.getProcessor( list );
+ processor.run();
+ monitor.documentsAdded( 1L );
+ }
+ }
+ log.debug( "Finished" );
+ }
+ catch (InterruptedException e) {
+ // normal quit: no need to propagate interruption.
+ log.debug( "Interrupted" );
+ }
+ finally {
+ //notify everybody we have finished.
+ endSignal.countDown();
+ }
+ }
+
+}
\ No newline at end of file
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/IndexerProgressMonitor.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,51 @@
+package org.hibernate.search.batchindexing;
+
+/**
+ * As an indexer can take some time to finish,
+ * an IndexerProgressMonitor can be defined in
+ * the configuration property
+ * hibernate.search.worker.indexing.monitor
+ * implementing this interface to track indexing
+ * performance.
+ * Implementors must be threadsafe and have a
+ * no-arg constructor.
+ *
+ * @author Sanne Grinovero
+ */
+public interface IndexerProgressMonitor {
+
+ /**
+ * The number of documents sent to the backend;
+ * This is called several times during
+ * the indexing process.
+ * @param increment
+ */
+ void documentsAdded(long increment);
+
+ /**
+ * The number of Documents built;
+ * This is called several times and concurrently during
+ * the indexing process.
+ * @param number
+ */
+ void documentsBuilt(int number);
+
+ /**
+ * The number of entities loaded from database;
+ * This is called several times and concurrently during
+ * the indexing process.
+ * @param size
+ */
+ void entitiesLoaded(int size);
+
+ /**
+ * The total count of entities to be indexed is
+ * added here; It could be called more than once,
+ * the implementation should add them up.
+ * This is called several times and concurrently during
+ * the indexing process.
+ * @param count
+ */
+ void addToTotalCount(long count);
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/batchindexing/ProducerConsumerQueue.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,82 @@
+package org.hibernate.search.batchindexing;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Implements a blocking queue capable of storing
+ * a "poison" token to signal consumer threads
+ * that the task is finished.
+ *
+ * @author Sanne Grinovero
+ */
+public class ProducerConsumerQueue {
+
+ private static final int DEFAULT_BUFF_LENGHT = 1000;
+ private static final Object exitToken = new Object();
+
+ private final BlockingQueue queue;
+ private final AtomicInteger producersToWaitFor;
+
+ /**
+ * @param producersToWaitFor The number of producer threads.
+ */
+ public ProducerConsumerQueue( int producersToWaitFor ) {
+ this( DEFAULT_BUFF_LENGHT, producersToWaitFor );
+ }
+
+ public ProducerConsumerQueue( int queueLenght, int producersToWaitFor ) {
+ queue = new ArrayBlockingQueue( queueLenght );
+ this.producersToWaitFor = new AtomicInteger( producersToWaitFor );
+ }
+
+ /**
+ * Blocks until an object is available; when null
+ * is returned the client thread should quit.
+ * @return the next object in the queue, or null to exit
+ * @throws InterruptedException
+ */
+ public Object take() throws InterruptedException {
+ Object obj = queue.take();
+ if ( obj == exitToken ) {
+ //restore exit signal for other threads
+ queue.put( exitToken );
+ return null;
+ }
+ else {
+ return obj;
+ }
+ }
+
+ /**
+ * Adds a new object to the queue, blocking if no space is
+ * available.
+ * @param obj
+ * @throws InterruptedException
+ */
+ public void put(Object obj) throws InterruptedException {
+ queue.put( obj );
+ }
+
+ /**
+ * Each producer thread should call producerStopping() when it has
+ * finished. After doing it can safely terminate.
+ * After all producer threads have called producerStopping()
+ * a token will be inserted in the blocking queue to eventually
+ * awake sleaping consumers and have them quit, after the
+ * queue has been processed.
+ */
+ public void producerStopping() {
+ int activeProducers = producersToWaitFor.decrementAndGet();
+ //last producer must close consumers
+ if ( activeProducers == 0 ) {
+ try {
+ queue.put( exitToken );//awake all waiting threads to let them quit.
+ } catch (InterruptedException e) {
+ //just quit, consumers will be interrupted anyway if it's a shutdown.
+ }
+ }
+ }
+
+}
Modified:
search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java 2009-05-18
01:44:29 UTC (rev 16579)
+++
search/trunk/src/main/java/org/hibernate/search/engine/DocumentBuilderIndexedEntity.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -105,7 +105,7 @@
/**
* Flag indicating whether there is an explicit id (@DocumentId or @Id) or not. When
Search is used as make
- * for example JBoss Cache searchable the <code>idKeywordName</code> wil be
provided.
+ * for example using JBoss Cache Searchable the <code>idKeywordName</code>
will be provided.
*/
private boolean idProvided = false;
@@ -337,7 +337,7 @@
super.addWorkToQueue( entityClass, entity, id, workType, queue,
searchFactoryImplementor );
}
- private AddLuceneWork createAddWork(Class<T> entityClass, T entity, Serializable
id, String idInString, boolean isBatch) {
+ public AddLuceneWork createAddWork(Class<T> entityClass, T entity, Serializable
id, String idInString, boolean isBatch) {
Map<String, String> fieldToAnalyzerMap = new HashMap<String, String>();
Document doc = getDocument( entity, id, fieldToAnalyzerMap );
AddLuceneWork addWork;
Modified: search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java 2009-05-18
01:44:29 UTC (rev 16579)
+++
search/trunk/src/main/java/org/hibernate/search/impl/FullTextSessionImpl.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -46,6 +46,7 @@
import org.hibernate.search.FullTextQuery;
import org.hibernate.search.FullTextSession;
import org.hibernate.search.SearchFactory;
+import org.hibernate.search.Indexer;
import org.hibernate.search.backend.TransactionContext;
import org.hibernate.search.backend.Work;
import org.hibernate.search.backend.WorkType;
@@ -162,6 +163,15 @@
//another solution would be to subclass SessionImpl instead of having this
LuceneSession delegation model
//this is an open discussion
}
+
+ public Indexer createIndexer(Class<?>... types) {
+ if ( types.length == 0 ) {
+ return new IndexerImpl( getSearchFactoryImplementor(), getSessionFactory(),
Object.class );
+ }
+ else {
+ return new IndexerImpl( getSearchFactoryImplementor(), getSessionFactory(), types );
+ }
+ }
public SearchFactory getSearchFactory() {
if ( searchFactory == null ) {
Added: search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
===================================================================
--- search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java
(rev 0)
+++ search/trunk/src/main/java/org/hibernate/search/impl/IndexerImpl.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,172 @@
+package org.hibernate.search.impl;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.CacheMode;
+import org.hibernate.SessionFactory;
+import org.hibernate.search.Indexer;
+import org.hibernate.search.batchindexing.BatchIndexingWorkspace;
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * Prepares and configures a BatchIndexingWorkspace to start rebuilding
+ * the indexes for all entities in the database.
+ *
+ * @author Sanne Grinovero
+ */
+public class IndexerImpl implements Indexer {
+
+ private static final Logger log = LoggerFactory.make();
+
+ private final SearchFactoryImplementor searchFactoryImplementor;
+ private final SessionFactory sessionFactory;
+
+ protected Set<Class<?>> rootEntities = new HashSet<Class<?>>();
+ protected List<BatchIndexingWorkspace> indexers = new
LinkedList<BatchIndexingWorkspace>();
+ boolean started = false;
+ private CountDownLatch endAllSignal;
+
+ // default settings defined here:
+ private int objectLoadingThreads = 2; //loading the main entity
+ private int collectionLoadingThreads = 4; //also responsible for loading of lazy
@IndexedEmbedded collections
+ private int writerThreads = 1; //also running the Analyzers
+ private int objectLoadingBatchSize = 10;
+ private CacheMode cacheMode = CacheMode.IGNORE;
+ private boolean optimizeAtEnd = true;
+ private boolean purgeAtStart = true;
+ private boolean optimizeAfterPurge = true;
+ private IndexerProgressMonitor monitor = new SimpleIndexingProgressMonitor();
+
+ protected IndexerImpl(SearchFactoryImplementor searchFactory, SessionFactory
sessionFactory, Class<?>...entities) {
+ this.searchFactoryImplementor = searchFactory;
+ this.sessionFactory = sessionFactory;
+ rootEntities = toRootEntities( searchFactoryImplementor, entities );
+ }
+
+ /**
+ * From the set of classes a new set is built containing all indexed
+ * subclasses, but removing then all subtypes of indexed entities.
+ * @param selection
+ * @return a new set of entities
+ */
+ private static Set<Class<?>> toRootEntities(SearchFactoryImplementor
searchFactoryImplementor, Class<?>... selection) {
+ Set<Class<?>> entities = new HashSet<Class<?>>();
+ //first build the "entities" set containing all indexed subtypes of
"selection".
+ for (Class<?> entityType : selection) {
+ Set<Class<?>> targetedClasses =
searchFactoryImplementor.getIndexedTypesPolymorphic( new Class[] {entityType} );
+ if ( targetedClasses.isEmpty() ) {
+ String msg = entityType.getName() + " is not an indexed entity or a subclass of
an indexed entity";
+ throw new IllegalArgumentException( msg );
+ }
+ entities.addAll( targetedClasses );
+ }
+ Set<Class<?>> cleaned = new HashSet<Class<?>>();
+ Set<Class<?>> toRemove = new HashSet<Class<?>>();
+ //now remove all repeated types to avoid duplicate loading by polymorphic query
loading
+ for (Class<?> type : entities) {
+ boolean typeIsOk = true;
+ for (Class<?> existing : cleaned) {
+ if ( existing.isAssignableFrom( type ) ) {
+ typeIsOk = false;
+ break;
+ }
+ if ( type.isAssignableFrom( existing ) ) {
+ toRemove.add( existing );
+ }
+ }
+ if ( typeIsOk ) {
+ cleaned.add( type );
+ }
+ }
+ cleaned.removeAll( toRemove );
+ if ( log.isDebugEnabled() ) {
+ log.debug( "Targets for indexing job: {}", cleaned );
+ }
+ return cleaned;
+ }
+
+ public Indexer cacheMode(CacheMode cacheMode) {
+ if ( cacheMode == null )
+ throw new IllegalArgumentException( "cacheMode must not be null" );
+ this.cacheMode = cacheMode;
+ return this;
+ }
+
+ public Indexer objectLoadingThreads(int numberOfThreads) {
+ if ( numberOfThreads < 1 )
+ throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+ this.objectLoadingThreads = numberOfThreads;
+ return this;
+ }
+
+ public Indexer objectLoadingBatchSize(int batchSize) {
+ if ( batchSize < 1 )
+ throw new IllegalArgumentException( "batchSize must be at least 1" );
+ this.objectLoadingBatchSize = batchSize;
+ return this;
+ }
+
+ public Indexer documentBuilderThreads(int numberOfThreads) {
+ if ( numberOfThreads < 1 )
+ throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+ this.collectionLoadingThreads = numberOfThreads;
+ return this;
+ }
+
+ public Indexer indexWriterThreads(int numberOfThreads) {
+ if ( numberOfThreads < 1 )
+ throw new IllegalArgumentException( "numberOfThreads must be at least 1" );
+ this.writerThreads = numberOfThreads;
+ return this;
+ }
+
+ public Indexer optimizeAtEnd(boolean optimize) {
+ this.optimizeAtEnd = optimize;
+ return this;
+ }
+
+ public Indexer optimizeAfterPurge(boolean optimize) {
+ this.optimizeAfterPurge = optimize;
+ return this;
+ }
+
+ public Indexer purgeAllAtStart(boolean purgeAll) {
+ this.purgeAtStart = purgeAll;
+ return this;
+ }
+
+ public void start() {
+ if ( started ) {
+ throw new IllegalStateException( "Can be started only once" );
+ }
+ else {
+ started = true;
+ endAllSignal = new CountDownLatch( rootEntities.size() );
+ for ( Class<?> type : rootEntities ) {
+ indexers.add( new BatchIndexingWorkspace(
+ searchFactoryImplementor, sessionFactory, type,
+ objectLoadingThreads, collectionLoadingThreads, writerThreads,
+ cacheMode, objectLoadingBatchSize,
+ optimizeAtEnd, purgeAtStart, optimizeAfterPurge,
+ endAllSignal, monitor) );
+ }
+ for ( BatchIndexingWorkspace batcher : indexers ) {
+ new Thread( batcher ).start();
+ }
+ }
+ }
+
+ public boolean startAndWait(long timeout, TimeUnit unit) throws InterruptedException {
+ start();
+ return endAllSignal.await( timeout, unit );
+ }
+
+}
Added:
search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java
(rev 0)
+++
search/trunk/src/main/java/org/hibernate/search/impl/SimpleIndexingProgressMonitor.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,56 @@
+package org.hibernate.search.impl;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.hibernate.search.batchindexing.IndexerProgressMonitor;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * A very simple implementation of IndexerProgressMonitor
+ *
+ * @author Sanne Grinovero
+ */
+public class SimpleIndexingProgressMonitor implements IndexerProgressMonitor {
+
+ private static final Logger log = LoggerFactory.make();
+ private final AtomicLong documentsDoneCounter = new AtomicLong();
+ private final AtomicLong totalCounter = new AtomicLong();
+ private volatile long startTimeMs;
+
+ public void entitiesLoaded(int size) {
+ //not used
+ }
+
+ public void documentsAdded(long increment) {
+ long current = documentsDoneCounter.addAndGet( increment );
+ if ( current == increment ) {
+ startTimeMs = System.currentTimeMillis();
+ }
+ if ( current % getStatusMessagePeriod() == 0 ) {
+ printStatusMessage( startTimeMs, totalCounter.get(), current );
+ }
+ }
+
+ public void documentsBuilt(int number) {
+ //not used
+ }
+
+ public void addToTotalCount(long count) {
+ totalCounter.addAndGet( count );
+ log.info( "Going to reindex {} entities", count );
+ }
+
+ protected int getStatusMessagePeriod() {
+ return 1000;
+ }
+
+ protected void printStatusMessage(long starttimems, long totalTodoCount, long doneCount)
{
+ long elapsedMs = System.currentTimeMillis() - starttimems;
+ log.info( "{} documents indexed in {} ms", doneCount, elapsedMs );
+ double estimateSpeed = ( (double) doneCount * 1000f ) / elapsedMs ;
+ double estimatePercentileComplete = ( (double) doneCount*100 ) / (double)totalTodoCount
;
+ log.info( "Indexing speed: {} documents/second; progress: {}%",
estimateSpeed, estimatePercentileComplete );
+ }
+
+}
Modified: search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java 2009-05-18
01:44:29 UTC (rev 16579)
+++
search/trunk/src/main/java/org/hibernate/search/jpa/FullTextEntityManager.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -4,6 +4,7 @@
import java.io.Serializable;
import javax.persistence.EntityManager;
+import org.hibernate.search.Indexer;
import org.hibernate.search.SearchFactory;
/**
@@ -68,5 +69,14 @@
* Flush all index changes forcing Hibernate Search to apply all changes to the index
not waiting for the batch limit.
*/
public void flushToIndexes();
+
+ /**
+ * Creates an Indexer to rebuild the indexes of some
+ * or all indexed entity types.
+ * Instances cannot be reused.
+ * @param types optionally restrict the operation to selected types
+ * @return
+ */
+ public Indexer createIndexer(Class<?>... types);
}
Modified:
search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java
===================================================================
---
search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java 2009-05-18
01:44:29 UTC (rev 16579)
+++
search/trunk/src/main/java/org/hibernate/search/jpa/impl/FullTextEntityManagerImpl.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -10,6 +10,7 @@
import org.hibernate.search.jpa.FullTextEntityManager;
import org.hibernate.search.jpa.FullTextQuery;
+import org.hibernate.search.Indexer;
import org.hibernate.search.SearchFactory;
import org.hibernate.search.SearchException;
import org.hibernate.search.FullTextSession;
@@ -176,4 +177,9 @@
public EntityTransaction getTransaction() {
return em.getTransaction();
}
+
+ public Indexer createIndexer(Class<?>... types) {
+ return getFullTextSession().createIndexer( types );
+ }
+
}
Added:
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java
(rev 0)
+++
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/AncientBook.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,13 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+
+import org.hibernate.search.annotations.Indexed;
+
+@Entity
+@Indexed
+public class AncientBook extends Book {
+
+ public String catalogueGroupName = "";
+
+}
Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java
(rev 0)
+++
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Book.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,15 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+import org.hibernate.search.annotations.Indexed;
+
+@Indexed
+@Entity
+public class Book {
+
+ @Id
+ public long id;
+
+}
Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java
(rev 0)
+++ search/trunk/src/test/java/org/hibernate/search/test/batchindexing/Dvd.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,36 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+import javax.persistence.GeneratedValue;
+import javax.persistence.Id;
+
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Indexed;
+
+@Indexed
+@Entity
+public class Dvd {
+
+ public long unusuallyNamedIdentifier;
+ public String title;
+
+ @Id
+ @GeneratedValue
+ public long getUnusuallyNamedIdentifier() {
+ return unusuallyNamedIdentifier;
+ }
+
+ public void setUnusuallyNamedIdentifier(long unusuallyNamedIdentifier) {
+ this.unusuallyNamedIdentifier = unusuallyNamedIdentifier;
+ }
+
+ @Field
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+}
Added: search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java
===================================================================
--- search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java
(rev 0)
+++
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/ModernBook.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,13 @@
+package org.hibernate.search.test.batchindexing;
+
+import javax.persistence.Entity;
+
+import org.hibernate.search.annotations.Indexed;
+
+@Entity
+@Indexed
+public class ModernBook extends Book {
+
+ public String isbn = null;
+
+}
Added:
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
===================================================================
---
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java
(rev 0)
+++
search/trunk/src/test/java/org/hibernate/search/test/batchindexing/SearchIndexerTest.java 2009-05-18
01:59:10 UTC (rev 16580)
@@ -0,0 +1,118 @@
+package org.hibernate.search.test.batchindexing;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.TermQuery;
+import org.hibernate.Transaction;
+import org.hibernate.search.Environment;
+import org.hibernate.search.FullTextQuery;
+import org.hibernate.search.FullTextSession;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.impl.IndexerImpl;
+import org.hibernate.search.test.util.FullTextSessionBuilder;
+
+public class SearchIndexerTest extends TestCase {
+
+ public void testEntityHierarchy() {
+ FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
+ .addAnnotatedClass( ModernBook.class )
+ .addAnnotatedClass( AncientBook.class )
+ .addAnnotatedClass( Dvd.class )
+ .addAnnotatedClass( Book.class )
+ .build();
+ FullTextSession fullTextSession = ftsb.openFullTextSession();
+ SearchFactoryImplementor searchFactory = (SearchFactoryImplementor)
fullTextSession.getSearchFactory();
+ {
+ TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory,
Book.class );
+ assertTrue( tsii.getRootEntities().contains( Book.class ) );
+ assertFalse( tsii.getRootEntities().contains( ModernBook.class ) );
+ assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+ }
+ {
+ TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory,
ModernBook.class, AncientBook.class, Book.class );
+ assertTrue( tsii.getRootEntities().contains( Book.class ) );
+ assertFalse( tsii.getRootEntities().contains( ModernBook.class ) );
+ assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+ }
+ {
+ TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory,
ModernBook.class, AncientBook.class );
+ assertFalse( tsii.getRootEntities().contains( Book.class ) );
+ assertTrue( tsii.getRootEntities().contains( ModernBook.class ) );
+ assertTrue( tsii.getRootEntities().contains( AncientBook.class ) );
+ }
+ //verify that indexing Object will result in one separate indexer working per root
indexed entity
+ {
+ TestableSearchIndexerImpl tsii = new TestableSearchIndexerImpl( searchFactory,
Object.class );
+ assertTrue( tsii.getRootEntities().contains( Book.class ) );
+ assertTrue( tsii.getRootEntities().contains( Dvd.class ) );
+ assertFalse( tsii.getRootEntities().contains( AncientBook.class ) );
+ assertFalse( tsii.getRootEntities().contains( Object.class ) );
+ assertEquals( 2, tsii.getRootEntities().size() );
+ }
+ }
+
+ private class TestableSearchIndexerImpl extends IndexerImpl {
+
+ protected TestableSearchIndexerImpl(SearchFactoryImplementor searchFactory,
Class<?>... types) {
+ super( searchFactory, null, types );
+ }
+
+ public Set<Class<?>> getRootEntities() {
+ return this.rootEntities;
+ }
+
+ }
+
+ public void testIdentifierNaming() throws InterruptedException {
+ //disable automatic indexing, to test manual index creation.
+ FullTextSessionBuilder ftsb = new FullTextSessionBuilder()
+ .setProperty( org.hibernate.search.Environment.ANALYZER_CLASS,
StandardAnalyzer.class.getName() )
+ .addAnnotatedClass( Dvd.class )
+ .setProperty( Environment.INDEXING_STRATEGY, "manual" )
+ .build();
+ {
+ //creating the test data in database only:
+ FullTextSession fullTextSession = ftsb.openFullTextSession();
+ Transaction transaction = fullTextSession.beginTransaction();
+ Dvd dvda = new Dvd();
+ dvda.setTitle( "Star Trek (episode 96367)" );
+ fullTextSession.save(dvda);
+ Dvd dvdb = new Dvd();
+ dvdb.setTitle( "The Trek" );
+ fullTextSession.save(dvdb);
+ transaction.commit();
+ fullTextSession.close();
+ }
+ {
+ //verify index is still empty:
+ assertEquals( 0, countResults( new Term( "title", "trek" ), ftsb,
Dvd.class ) );
+ }
+ {
+ FullTextSession fullTextSession = ftsb.openFullTextSession();
+ fullTextSession.createIndexer( Dvd.class )
+ .startAndWait( 10, TimeUnit.SECONDS );
+ fullTextSession.close();
+ }
+ {
+ //verify index is now containing both DVDs:
+ assertEquals( 2, countResults( new Term( "title", "trek" ), ftsb,
Dvd.class ) );
+ }
+ }
+
+ public int countResults( Term termForQuery, FullTextSessionBuilder ftSessionBuilder,
Class<?> type ) {
+ TermQuery fullTextQuery = new TermQuery( new Term( "title", "trek"
) );
+ FullTextSession fullTextSession = ftSessionBuilder.openFullTextSession();
+ Transaction transaction = fullTextSession.beginTransaction();
+ FullTextQuery query = fullTextSession.createFullTextQuery( fullTextQuery, type );
+ int resultSize = query.getResultSize();
+ transaction.commit();
+ fullTextSession.close();
+ return resultSize;
+ }
+
+}