[hibernate-commits] Hibernate SVN: r15618 - in search/trunk/src/java/org/hibernate/search/backend/impl: lucene and 1 other directories.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Tue Nov 25 13:38:50 EST 2008


Author: sannegrinovero
Date: 2008-11-25 13:38:50 -0500 (Tue, 25 Nov 2008)
New Revision: 15618

Added:
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
Modified:
   search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
   search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
Log:
HSEARCH-268 Apply changes to different indexes in parallel

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -48,8 +48,7 @@
 
 	public BatchedQueueingProcessor(SearchFactoryImplementor searchFactoryImplementor, Properties properties) {
 		this.searchFactoryImplementor = searchFactoryImplementor;
-		//default to sync if none defined
-		this.sync = !"async".equalsIgnoreCase( properties.getProperty( Environment.WORKER_EXECUTION ) );
+		this.sync = isConfiguredAsSync( properties );
 
 		//default to a simple asynchronous operation
 		int min = ConfigurationParseHelper.getIntValue( properties, Environment.WORKER_THREADPOOL_SIZE, 1 );
@@ -207,5 +206,14 @@
 			return this == SECOND && type == WorkType.COLLECTION;
 		}
 	}
+	
+	/**
+	 * @param properties the configuration to parse
+	 * @return true if the configuration uses sync indexing
+	 */
+	public static boolean isConfiguredAsSync(Properties properties){
+		//default to sync if none defined
+		return !"async".equalsIgnoreCase( properties.getProperty( Environment.WORKER_EXECUTION ) );
+	}
 
 }

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -5,11 +5,12 @@
 import java.util.Map;
 
 import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
 import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
 import org.hibernate.search.engine.SearchFactoryImplementor;
 import org.hibernate.search.store.DirectoryProvider;
 import org.hibernate.search.store.IndexShardingStrategy;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
 
 /**
  * Apply the operations to Lucene directories.
@@ -23,31 +24,37 @@
 	
 	private final List<LuceneWork> queue;
 	private final SearchFactoryImplementor searchFactoryImplementor;
-	private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap;
-	private static final DpSelectionVisitor providerSelectionVisitor = new DpSelectionVisitor(); 
+	private final Map<DirectoryProvider,PerDPResources> resourcesMap;
+	private final boolean sync;
+	
+	private static final DpSelectionVisitor providerSelectionVisitor = new DpSelectionVisitor();
+	private static final Logger log = LoggerFactory.make();
 
 	LuceneBackendQueueProcessor(List<LuceneWork> queue,
 			SearchFactoryImplementor searchFactoryImplementor,
-			Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap) {
+			Map<DirectoryProvider,PerDPResources> resourcesMap,
+			boolean syncMode) {
+		this.sync = syncMode;
 		this.queue = queue;
 		this.searchFactoryImplementor = searchFactoryImplementor;
-		this.visitorsMap = visitorsMap;
+		this.resourcesMap = resourcesMap;
 	}
 
 	public void run() {
-		QueueProcessors processors = new QueueProcessors( visitorsMap );
-		// divide tasks in parts, adding to QueueProcessors by affected Directory.
+		QueueProcessors processors = new QueueProcessors( resourcesMap );
+		// divide the queue in tasks, adding to QueueProcessors by affected Directory.
 		for ( LuceneWork work : queue ) {
 			final Class<?> entityType = work.getEntityClass();
 			DocumentBuilderIndexedEntity<?> documentBuilder = searchFactoryImplementor.getDocumentBuilderIndexedEntity( entityType );
 			IndexShardingStrategy shardingStrategy = documentBuilder.getDirectoryProviderSelectionStrategy();
 			work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work, shardingStrategy, processors );
 		}
-		// TODO next cycle could be performed in parallel
-		for ( PerDPQueueProcessor processor : processors.getQueueProcessors() ) {
-			// perform the work on indexes
-			processor.performWorks();
+		try {
+			//this Runnable splits tasks in more runnables and then runs them:
+			processors.runAll( sync );
+		} catch (InterruptedException e) {
+			log.error( "Index update task has been interrupted", e );
 		}
 	}
-
+	
 }

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -5,11 +5,11 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.hibernate.search.backend.BackendQueueProcessorFactory;
 import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
 import org.hibernate.search.engine.SearchFactoryImplementor;
 import org.hibernate.search.store.DirectoryProvider;
 
@@ -34,23 +34,32 @@
 	 * lifecycle (reused and shared by all transactions);
 	 * the LuceneWorkVisitor(s) are stateless, the Workspace(s) are threadsafe.
 	 */
-	private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap = new HashMap<DirectoryProvider,LuceneWorkVisitor>();
+	private final Map<DirectoryProvider,PerDPResources> resourcesMap = new HashMap<DirectoryProvider,PerDPResources>();
 
+	/**
+	 * copy of BatchedQueueingProcessor.sync
+	 */
+	private boolean sync;
+
 	public void initialize(Properties props, SearchFactoryImplementor searchFactoryImplementor) {
 		this.searchFactoryImp = searchFactoryImplementor;
+		this.sync = BatchedQueueingProcessor.isConfiguredAsSync( props );
 		for (DirectoryProvider dp : searchFactoryImplementor.getDirectoryProviders() ) {
-			Workspace w = new Workspace( searchFactoryImplementor, dp );
-			LuceneWorkVisitor visitor = new LuceneWorkVisitor( w );
-			visitorsMap.put( dp, visitor );
+			PerDPResources resources = new PerDPResources( searchFactoryImplementor, dp );
+			resourcesMap.put( dp, resources );
 		}
 	}
 
 	public Runnable getProcessor(List<LuceneWork> queue) {
-		return new LuceneBackendQueueProcessor( queue, searchFactoryImp, visitorsMap );
+		return new LuceneBackendQueueProcessor( queue, searchFactoryImp, resourcesMap, sync );
 	}
 
 	public void close() {
-		// no need to release anything
+		// needs to stop all used ThreadPools
+		for (PerDPResources res : resourcesMap.values() ) {
+			ExecutorService executor = res.getExecutor();
+			executor.shutdown();
+		}
 	}
 	
 }

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -2,6 +2,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.index.IndexWriter;
@@ -16,11 +17,12 @@
 /**
  * @author Sanne Grinovero
  */
-class PerDPQueueProcessor {
+class PerDPQueueProcessor implements Runnable {
 	
 	private static final Logger log = LoggerFactory.make();
 	private final Workspace workspace;
 	private final LuceneWorkVisitor worker;
+	private final ExecutorService executor;
 	private final List<LuceneWork> workOnWriter = new ArrayList<LuceneWork>();
 	private final List<LuceneWork> workOnReader= new ArrayList<LuceneWork>();
 	
@@ -29,9 +31,10 @@
 	private boolean needsWriter = false;
 	private boolean preferReader = false;
 	
-	public PerDPQueueProcessor(LuceneWorkVisitor worker) {
-		this.worker = worker;
-		this.workspace = worker.getWorkspace();
+	public PerDPQueueProcessor(PerDPResources resources) {
+		this.worker = resources.getVisitor();
+		this.workspace = resources.getWorkspace();
+		this.executor = resources.getExecutor();
 	}
 
 	public void addWork(LuceneWork work) {
@@ -56,7 +59,7 @@
 		}
 	}
 
-	public void performWorks() {
+	public void run() {
 		// skip "resource optimization mode" when in batch to have all tasks use preferred (optimal) mode.
 		if ( ! batchmode ) {
 			// 	see if we can skip using some resource
@@ -153,5 +156,9 @@
 		workOnWriter.addAll( 0, workOnReader );
 		workOnReader.clear();
 	}
-	
+
+	public ExecutorService getOwningExecutor() {
+		return executor;
+	}
+
 }

Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java	                        (rev 0)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -0,0 +1,35 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.store.DirectoryProvider;
+
+class PerDPResources {
+	
+	private final ExecutorService executor;
+	private final LuceneWorkVisitor visitor;
+	private final Workspace workspace;
+	
+	PerDPResources(SearchFactoryImplementor searchFactoryImp, DirectoryProvider dp) {
+		workspace = new Workspace( searchFactoryImp, dp );
+		visitor = new LuceneWorkVisitor( workspace );
+		executor = Executors.newFixedThreadPool( 1 );
+	}
+
+	public ExecutorService getExecutor() {
+		return executor;
+	}
+
+	public LuceneWorkVisitor getVisitor() {
+		return visitor;
+	}
+
+	public Workspace getWorkspace() {
+		return workspace;
+	}
+	
+}


Property changes on: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
___________________________________________________________________
Name: svn:executable
   + *

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -1,36 +1,93 @@
 package org.hibernate.search.backend.impl.lucene;
 
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
 
 import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
 import org.hibernate.search.store.DirectoryProvider;
 
 /**
- * Container used to split work by DirectoryProviders.
+ * Container used to split work by DirectoryProviders and execute
+ * them concurrently.
  * @author Sanne Grinovero
  */
 class QueueProcessors {
 	
-	private final Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap;
+	private final Map<DirectoryProvider, PerDPResources> resourcesMap;
 	private final Map<DirectoryProvider, PerDPQueueProcessor> dpProcessors = new HashMap<DirectoryProvider, PerDPQueueProcessor>();
 	
-	QueueProcessors(Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap) {
-		this.visitorsMap = visitorsMap;
+	QueueProcessors(Map<DirectoryProvider, PerDPResources> resourcesMap) {
+		this.resourcesMap = resourcesMap;
 	}
 
 	void addWorkToDpProcessor(DirectoryProvider dp, LuceneWork work) {
 		if ( ! dpProcessors.containsKey( dp ) ) {
-			dpProcessors.put( dp, new PerDPQueueProcessor( visitorsMap.get( dp ) ) );
+			dpProcessors.put( dp, new PerDPQueueProcessor( resourcesMap.get( dp ) ) );
 		}
 		PerDPQueueProcessor processor = dpProcessors.get( dp );
 		processor.addWork ( work );
 	}
 	
-	Collection<PerDPQueueProcessor> getQueueProcessors(){
-		return dpProcessors.values();
+	/**
+	 * Run all index modifications queued so far
+	 * @param sync when true this method blocks until all job is done.
+	 * @throws InterruptedException only relevant when sync is true.
+	 */
+	void runAll(boolean sync) throws InterruptedException {
+		if ( sync ) {
+			runAllWaiting();
+		}
+		else {
+			runAllAsync();
+		}
 	}
+	
+	/**
+	 * Runs all PerDPQueueProcessor and don't wait fot them to finish.
+	 */
+	private void runAllAsync() {
+		// execute all work in parallel on each DirectoryProvider;
+		// each DP has it's own ExecutorService.
+		for ( PerDPQueueProcessor process : dpProcessors.values() ) {
+			ExecutorService executor = process.getOwningExecutor();
+			executor.execute( process );
+		}
+	}
 
+	/**
+	 * Runs all PerDPQueueProcessor and waits until all have been processed.
+	 * @throws InterruptedException
+	 */
+	private void runAllWaiting() throws InterruptedException {
+		List<Future<Object>> futures = new ArrayList<Future<Object>>( dpProcessors.size() );
+		// execute all work in parallel on each DirectoryProvider;
+		// each DP has it's own ExecutorService.
+		for ( PerDPQueueProcessor process : dpProcessors.values() ) {
+			ExecutorService executor = process.getOwningExecutor();
+			//wrap each Runnable in a Future
+			FutureTask<Object> f = new FutureTask<Object>( process, null );
+			futures.add( f );
+			executor.execute( f );
+		}
+		// and then wait for all tasks to be finished:
+		for ( Future<Object> f : futures ) {
+            if ( !f.isDone() ) {
+                try {
+                    f.get(); 
+                } catch(CancellationException ignore) {
+                	//ignored, as in java.util.concurrent.AbstractExecutorService.invokeAll(Collection<Callable<T>> tasks)
+                } catch(ExecutionException ignore) {
+                	//ignored, as in java.util.concurrent.AbstractExecutorService.invokeAll(Collection<Callable<T>> tasks)
+                }
+            }
+        }
+	}
+
 }

Modified: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java	2008-11-25 18:25:17 UTC (rev 15617)
+++ search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java	2008-11-25 18:38:50 UTC (rev 15618)
@@ -17,13 +17,6 @@
 	private final OptimizeWorkDelegate optimizeDelegate;
 	private final PurgeAllWorkDelegate purgeAllDelegate;
 	
-	/**
-	 * The Workspace this visitor has been created for;
-	 * different workspaces could use different Delegates for specific
-	 * needs basing on workspace or DirectoryProvider configuration.
-	 */
-	private final Workspace linkedWorkspace;
-
 	public LuceneWorkVisitor(Workspace workspace) {
 		if ( workspace.getEntitiesInDirectory().size() == 1 ) {
 			this.deleteDelegate = new DeleteExtWorkDelegate( workspace );
@@ -34,7 +27,6 @@
 		this.purgeAllDelegate = new PurgeAllWorkDelegate();
 		this.addDelegate = new AddWorkDelegate( workspace );
 		this.optimizeDelegate = new OptimizeWorkDelegate( workspace );
-		this.linkedWorkspace = workspace;
 	}
 
 	public LuceneWorkDelegate getDelegate(AddLuceneWork addLuceneWork) {
@@ -53,8 +45,4 @@
 		return purgeAllDelegate;
 	}
 	
-	public Workspace getWorkspace(){
-		return linkedWorkspace;
-	}
-
 }




More information about the hibernate-commits mailing list