Author: sannegrinovero
Date: 2008-11-25 13:38:50 -0500 (Tue, 25 Nov 2008)
New Revision: 15618
Added:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
Log:
HSEARCH-268 Apply changes to different indexes in parallel
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -48,8 +48,7 @@
public BatchedQueueingProcessor(SearchFactoryImplementor searchFactoryImplementor,
Properties properties) {
this.searchFactoryImplementor = searchFactoryImplementor;
- //default to sync if none defined
- this.sync = !"async".equalsIgnoreCase( properties.getProperty(
Environment.WORKER_EXECUTION ) );
+ this.sync = isConfiguredAsSync( properties );
//default to a simple asynchronous operation
int min = ConfigurationParseHelper.getIntValue( properties,
Environment.WORKER_THREADPOOL_SIZE, 1 );
@@ -207,5 +206,14 @@
return this == SECOND && type == WorkType.COLLECTION;
}
}
+
+ /**
+ * @param properties the configuration to parse
+ * @return true if the configuration uses sync indexing
+ */
+ public static boolean isConfiguredAsSync(Properties properties){
+ //default to sync if none defined
+ return !"async".equalsIgnoreCase( properties.getProperty(
Environment.WORKER_EXECUTION ) );
+ }
}
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -5,11 +5,12 @@
import java.util.Map;
import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
import org.hibernate.search.engine.DocumentBuilderIndexedEntity;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.store.DirectoryProvider;
import org.hibernate.search.store.IndexShardingStrategy;
+import org.hibernate.search.util.LoggerFactory;
+import org.slf4j.Logger;
/**
* Apply the operations to Lucene directories.
@@ -23,31 +24,37 @@
private final List<LuceneWork> queue;
private final SearchFactoryImplementor searchFactoryImplementor;
- private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap;
- private static final DpSelectionVisitor providerSelectionVisitor = new
DpSelectionVisitor();
+ private final Map<DirectoryProvider,PerDPResources> resourcesMap;
+ private final boolean sync;
+
+ private static final DpSelectionVisitor providerSelectionVisitor = new
DpSelectionVisitor();
+ private static final Logger log = LoggerFactory.make();
LuceneBackendQueueProcessor(List<LuceneWork> queue,
SearchFactoryImplementor searchFactoryImplementor,
- Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap) {
+ Map<DirectoryProvider,PerDPResources> resourcesMap,
+ boolean syncMode) {
+ this.sync = syncMode;
this.queue = queue;
this.searchFactoryImplementor = searchFactoryImplementor;
- this.visitorsMap = visitorsMap;
+ this.resourcesMap = resourcesMap;
}
public void run() {
- QueueProcessors processors = new QueueProcessors( visitorsMap );
- // divide tasks in parts, adding to QueueProcessors by affected Directory.
+ QueueProcessors processors = new QueueProcessors( resourcesMap );
+ // divide the queue in tasks, adding to QueueProcessors by affected Directory.
for ( LuceneWork work : queue ) {
final Class<?> entityType = work.getEntityClass();
DocumentBuilderIndexedEntity<?> documentBuilder =
searchFactoryImplementor.getDocumentBuilderIndexedEntity( entityType );
IndexShardingStrategy shardingStrategy =
documentBuilder.getDirectoryProviderSelectionStrategy();
work.getWorkDelegate( providerSelectionVisitor ).addAsPayLoadsToQueue( work,
shardingStrategy, processors );
}
- // TODO next cycle could be performed in parallel
- for ( PerDPQueueProcessor processor : processors.getQueueProcessors() ) {
- // perform the work on indexes
- processor.performWorks();
+ try {
+ //this Runnable splits tasks in more runnables and then runs them:
+ processors.runAll( sync );
+ } catch (InterruptedException e) {
+ log.error( "Index update task has been interrupted", e );
}
}
-
+
}
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -5,11 +5,11 @@
import java.util.Map;
import java.util.Properties;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.hibernate.search.backend.BackendQueueProcessorFactory;
import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.store.DirectoryProvider;
@@ -34,23 +34,32 @@
* lifecycle (reused and shared by all transactions);
* the LuceneWorkVisitor(s) are stateless, the Workspace(s) are threadsafe.
*/
- private final Map<DirectoryProvider,LuceneWorkVisitor> visitorsMap = new
HashMap<DirectoryProvider,LuceneWorkVisitor>();
+ private final Map<DirectoryProvider,PerDPResources> resourcesMap = new
HashMap<DirectoryProvider,PerDPResources>();
+ /**
+ * copy of BatchedQueueingProcessor.sync
+ */
+ private boolean sync;
+
public void initialize(Properties props, SearchFactoryImplementor
searchFactoryImplementor) {
this.searchFactoryImp = searchFactoryImplementor;
+ this.sync = BatchedQueueingProcessor.isConfiguredAsSync( props );
for (DirectoryProvider dp : searchFactoryImplementor.getDirectoryProviders() ) {
- Workspace w = new Workspace( searchFactoryImplementor, dp );
- LuceneWorkVisitor visitor = new LuceneWorkVisitor( w );
- visitorsMap.put( dp, visitor );
+ PerDPResources resources = new PerDPResources( searchFactoryImplementor, dp );
+ resourcesMap.put( dp, resources );
}
}
public Runnable getProcessor(List<LuceneWork> queue) {
- return new LuceneBackendQueueProcessor( queue, searchFactoryImp, visitorsMap );
+ return new LuceneBackendQueueProcessor( queue, searchFactoryImp, resourcesMap, sync );
}
public void close() {
- // no need to release anything
+ // needs to stop all used ThreadPools
+ for (PerDPResources res : resourcesMap.values() ) {
+ ExecutorService executor = res.getExecutor();
+ executor.shutdown();
+ }
}
}
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPQueueProcessor.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
@@ -16,11 +17,12 @@
/**
* @author Sanne Grinovero
*/
-class PerDPQueueProcessor {
+class PerDPQueueProcessor implements Runnable {
private static final Logger log = LoggerFactory.make();
private final Workspace workspace;
private final LuceneWorkVisitor worker;
+ private final ExecutorService executor;
private final List<LuceneWork> workOnWriter = new ArrayList<LuceneWork>();
private final List<LuceneWork> workOnReader= new ArrayList<LuceneWork>();
@@ -29,9 +31,10 @@
private boolean needsWriter = false;
private boolean preferReader = false;
- public PerDPQueueProcessor(LuceneWorkVisitor worker) {
- this.worker = worker;
- this.workspace = worker.getWorkspace();
+ public PerDPQueueProcessor(PerDPResources resources) {
+ this.worker = resources.getVisitor();
+ this.workspace = resources.getWorkspace();
+ this.executor = resources.getExecutor();
}
public void addWork(LuceneWork work) {
@@ -56,7 +59,7 @@
}
}
- public void performWorks() {
+ public void run() {
// skip "resource optimization mode" when in batch to have all tasks use
preferred (optimal) mode.
if ( ! batchmode ) {
// see if we can skip using some resource
@@ -153,5 +156,9 @@
workOnWriter.addAll( 0, workOnReader );
workOnReader.clear();
}
-
+
+ public ExecutorService getOwningExecutor() {
+ return executor;
+ }
+
}
Added: search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
(rev 0)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -0,0 +1,35 @@
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
+import org.hibernate.search.engine.SearchFactoryImplementor;
+import org.hibernate.search.store.DirectoryProvider;
+
+class PerDPResources {
+
+ private final ExecutorService executor;
+ private final LuceneWorkVisitor visitor;
+ private final Workspace workspace;
+
+ PerDPResources(SearchFactoryImplementor searchFactoryImp, DirectoryProvider dp) {
+ workspace = new Workspace( searchFactoryImp, dp );
+ visitor = new LuceneWorkVisitor( workspace );
+ executor = Executors.newFixedThreadPool( 1 );
+ }
+
+ public ExecutorService getExecutor() {
+ return executor;
+ }
+
+ public LuceneWorkVisitor getVisitor() {
+ return visitor;
+ }
+
+ public Workspace getWorkspace() {
+ return workspace;
+ }
+
+}
Property changes on:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/PerDPResources.java
___________________________________________________________________
Name: svn:executable
+ *
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/QueueProcessors.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -1,36 +1,93 @@
package org.hibernate.search.backend.impl.lucene;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import org.hibernate.search.backend.LuceneWork;
-import org.hibernate.search.backend.impl.lucene.works.LuceneWorkVisitor;
import org.hibernate.search.store.DirectoryProvider;
/**
- * Container used to split work by DirectoryProviders.
+ * Container used to split work by DirectoryProviders and execute
+ * them concurrently.
* @author Sanne Grinovero
*/
class QueueProcessors {
- private final Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap;
+ private final Map<DirectoryProvider, PerDPResources> resourcesMap;
private final Map<DirectoryProvider, PerDPQueueProcessor> dpProcessors = new
HashMap<DirectoryProvider, PerDPQueueProcessor>();
- QueueProcessors(Map<DirectoryProvider, LuceneWorkVisitor> visitorsMap) {
- this.visitorsMap = visitorsMap;
+ QueueProcessors(Map<DirectoryProvider, PerDPResources> resourcesMap) {
+ this.resourcesMap = resourcesMap;
}
void addWorkToDpProcessor(DirectoryProvider dp, LuceneWork work) {
if ( ! dpProcessors.containsKey( dp ) ) {
- dpProcessors.put( dp, new PerDPQueueProcessor( visitorsMap.get( dp ) ) );
+ dpProcessors.put( dp, new PerDPQueueProcessor( resourcesMap.get( dp ) ) );
}
PerDPQueueProcessor processor = dpProcessors.get( dp );
processor.addWork ( work );
}
- Collection<PerDPQueueProcessor> getQueueProcessors(){
- return dpProcessors.values();
+ /**
+ * Run all index modifications queued so far
+ * @param sync when true this method blocks until all job is done.
+ * @throws InterruptedException only relevant when sync is true.
+ */
+ void runAll(boolean sync) throws InterruptedException {
+ if ( sync ) {
+ runAllWaiting();
+ }
+ else {
+ runAllAsync();
+ }
}
+
+ /**
+ * Runs all PerDPQueueProcessor and don't wait fot them to finish.
+ */
+ private void runAllAsync() {
+ // execute all work in parallel on each DirectoryProvider;
+ // each DP has it's own ExecutorService.
+ for ( PerDPQueueProcessor process : dpProcessors.values() ) {
+ ExecutorService executor = process.getOwningExecutor();
+ executor.execute( process );
+ }
+ }
+ /**
+ * Runs all PerDPQueueProcessor and waits until all have been processed.
+ * @throws InterruptedException
+ */
+ private void runAllWaiting() throws InterruptedException {
+ List<Future<Object>> futures = new ArrayList<Future<Object>>(
dpProcessors.size() );
+ // execute all work in parallel on each DirectoryProvider;
+ // each DP has it's own ExecutorService.
+ for ( PerDPQueueProcessor process : dpProcessors.values() ) {
+ ExecutorService executor = process.getOwningExecutor();
+ //wrap each Runnable in a Future
+ FutureTask<Object> f = new FutureTask<Object>( process, null );
+ futures.add( f );
+ executor.execute( f );
+ }
+ // and then wait for all tasks to be finished:
+ for ( Future<Object> f : futures ) {
+ if ( !f.isDone() ) {
+ try {
+ f.get();
+ } catch(CancellationException ignore) {
+ //ignored, as in
java.util.concurrent.AbstractExecutorService.invokeAll(Collection<Callable<T>>
tasks)
+ } catch(ExecutionException ignore) {
+ //ignored, as in
java.util.concurrent.AbstractExecutorService.invokeAll(Collection<Callable<T>>
tasks)
+ }
+ }
+ }
+ }
+
}
Modified:
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java 2008-11-25
18:25:17 UTC (rev 15617)
+++
search/trunk/src/java/org/hibernate/search/backend/impl/lucene/works/LuceneWorkVisitor.java 2008-11-25
18:38:50 UTC (rev 15618)
@@ -17,13 +17,6 @@
private final OptimizeWorkDelegate optimizeDelegate;
private final PurgeAllWorkDelegate purgeAllDelegate;
- /**
- * The Workspace this visitor has been created for;
- * different workspaces could use different Delegates for specific
- * needs basing on workspace or DirectoryProvider configuration.
- */
- private final Workspace linkedWorkspace;
-
public LuceneWorkVisitor(Workspace workspace) {
if ( workspace.getEntitiesInDirectory().size() == 1 ) {
this.deleteDelegate = new DeleteExtWorkDelegate( workspace );
@@ -34,7 +27,6 @@
this.purgeAllDelegate = new PurgeAllWorkDelegate();
this.addDelegate = new AddWorkDelegate( workspace );
this.optimizeDelegate = new OptimizeWorkDelegate( workspace );
- this.linkedWorkspace = workspace;
}
public LuceneWorkDelegate getDelegate(AddLuceneWork addLuceneWork) {
@@ -53,8 +45,4 @@
return purgeAllDelegate;
}
- public Workspace getWorkspace(){
- return linkedWorkspace;
- }
-
}