[hibernate-commits] Hibernate SVN: r11233 - in branches/Branch_3_2/HibernateExt/search/src: java/org/hibernate/search/backend and 6 other directories.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Thu Feb 22 22:45:39 EST 2007


Author: epbernard
Date: 2007-02-22 22:45:39 -0500 (Thu, 22 Feb 2007)
New Revision: 11233

Added:
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
Removed:
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
Modified:
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java
   branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
   branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java
   branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
   branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
   branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
Log:
HSEARCH-16
HSEARCH-9

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -16,5 +16,7 @@
 	public static final String ANALYZER_CLASS = "hibernate.search.analyzer";
 
 	public static final String WORKER_PREFIX = "hibernate.search.worker.";
-	public static final String WORKER_IMPL = WORKER_PREFIX + "impl";
+	public static final String WORKER_SCOPE = WORKER_PREFIX + "scope";
+	public static final String WORKER_BACKEND = WORKER_PREFIX + "backend";
+	public static final String WORKER_PROCESS = WORKER_PREFIX + "process";
 }

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -18,6 +18,7 @@
 import org.hibernate.search.annotations.Indexed;
 import org.hibernate.search.backend.Worker;
 import org.hibernate.search.backend.WorkerFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
 import org.hibernate.search.engine.DocumentBuilder;
 import org.hibernate.search.store.DirectoryProvider;
 import org.hibernate.search.store.DirectoryProviderFactory;
@@ -38,7 +39,17 @@
 	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders =
 			new HashMap<DirectoryProvider, ReentrantLock>();
 	private Worker worker;
+	private BackendQueueProcessorFactory backendQueueProcessorFactory;
 
+
+	public BackendQueueProcessorFactory getBackendQueueProcessorFactory() {
+		return backendQueueProcessorFactory;
+	}
+
+	public void setBackendQueueProcessorFactory(BackendQueueProcessorFactory backendQueueProcessorFactory) {
+		this.backendQueueProcessorFactory = backendQueueProcessorFactory;
+	}
+
 	public SearchFactory(Configuration cfg) {
 		//yuk
 		ReflectionManager reflectionManager = getReflectionManager( cfg );
@@ -100,6 +111,7 @@
 		WorkerFactory workerFactory = new WorkerFactory();
 		workerFactory.configure( cfg, this );
 		worker = workerFactory.createWorker();
+
 	}
 
 	//code doesn't have to be multithreaded because SF creation is not.

Added: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Properties;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+
+/**
+ * Build stateful backend processor
+ * Must have a no arg constructor
+ * The factory typically prepare or pool the resources needed by the queue processor
+ *
+ * @author Emmanuel Bernard
+ */
+public interface BackendQueueProcessorFactory {
+	void initialize(Properties props, SearchFactory searchFactory);
+
+	/**
+	 * Return a runnable implementation responsible for processing the queue to a given backend
+	 */
+
+	Runnable getProcessor(List<Work> queue);
+}

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,22 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend;
-
-import java.util.Properties;
-import java.util.List;
-
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchFactory;
-
-/**
- * Execute the work for a given queue
- * 
- * @author Emmanuel Bernard
- */
-public interface QueueWorker extends Runnable {
-	void run();
-
-	void initialize(Properties props, SearchFactory searchFactory);
-
-	void setQueue(List<Work> queue);
-
-}

Copied: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java (from rev 11171, branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java)
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,32 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.List;
+
+import org.hibernate.search.backend.Work;
+
+/**
+ * Pile work operations
+ * No thread safety has to be implemented, the queue being scoped already
+ * The implementation must be "stateless" wrt the queue through (ie not store the queue state)
+ *
+ * @author Emmanuel Bernard
+ */
+public interface QueueingProcessor {
+	/**
+	 * Add a work
+	 */
+	void add(Work work, List<Work> queue);
+
+	/**
+	 * Execute works
+	 * @param queue
+	 */
+	void performWork(List<Work> queue);
+
+	/**
+	 * Rollback works
+	 * @param queue
+	 */
+	void cancelWork(List<Work> queue);
+}

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,26 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend;
-
-import org.hibernate.search.backend.Work;
-
-/**
- * Set of work operations
- *
- * @author Emmanuel Bernard
- */
-public interface WorkQueue {
-	/**
-	 * Add a work
-	 */
-	void add(Work work);
-
-	/**
-	 * Execute works
-	 */
-	void performWork();
-
-	/**
-	 * Rollback works
-	 */
-	void cancelWork();
-}

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -36,7 +36,8 @@
 		for ( Map.Entry entry : props.entrySet() ) {
 			String key = (String) entry.getKey();
 			if ( key.startsWith( Environment.WORKER_PREFIX ) ) {
-				workerProperties.setProperty( key.substring( Environment.WORKER_PREFIX.length() ), (String) entry.getValue() );
+				//key.substring( Environment.WORKER_PREFIX.length() )
+				workerProperties.setProperty( key, (String) entry.getValue() );
 			}
 		}
 		return workerProperties;
@@ -44,7 +45,7 @@
 
 	public Worker createWorker() {
 		Properties props = getProperties( cfg );
-		String impl = props.getProperty( Environment.WORKER_IMPL );
+		String impl = props.getProperty( Environment.WORKER_SCOPE );
 		Worker worker;
 		if ( StringHelper.isEmpty( impl ) ) {
 			worker = new TransactionalWorker();

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,71 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.SearchFactory;
-
-/**
- * @author Emmanuel Bernard
-*/
-public class BatchedQueueWorker implements QueueWorker {
-	private List<Work> queue;
-	private SearchFactory searchFactory;
-
-	public void run() {
-		Workspace workspace;
-		LuceneWorker worker;
-		workspace = new Workspace( searchFactory );
-		worker = new LuceneWorker( workspace );
-		try {
-			deadlockFreeQueue(queue, workspace);
-			for ( Work work : queue ) {
-				worker.performWork( work );
-			}
-		}
-		finally {
-			workspace.clean();
-			queue.clear();
-		}
-	}
-
-	public void initialize(Properties props, SearchFactory searchFactory) {
-		this.searchFactory = searchFactory;
-	}
-
-	public void setQueue(List<Work> queue) {
-		this.queue = queue;
-	}
-
-	/**
-	 * one must lock the directory providers in the exact same order to avoid
-	 * dead lock between concurrent threads or processes
-	 * To achieve that, the work will be done per directory provider
-	 */
-	private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
-		Collections.sort( queue, new Comparator<Work>() {
-			public int compare(Work o1, Work o2) {
-				long h1 = getWorkHashCode( o1, workspace );
-				long h2 = getWorkHashCode( o2, workspace );
-				return h1 < h2 ?
-						-1 :
-						h1 == h2 ?
-							0 :
-							1;
-			}
-		} );
-	}
-
-	private long getWorkHashCode(Work work, Workspace workspace) {
-		long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
-		if (work instanceof AddWork ) h+=1; //addwork after deleteWork
-		return h;
-	}
-}

Copied: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java (from rev 11171, branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java)
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,113 @@
+//$Id: $
+package org.hibernate.search.backend.impl;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.annotations.common.util.StringHelper;
+import org.hibernate.search.Environment;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.DeleteWork;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.UpdateWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.impl.jms.JMSBackendQueueProcessorFactory;
+import org.hibernate.search.backend.impl.lucene.LuceneBackendQueueProcessorFactory;
+import org.hibernate.util.ReflectHelper;
+
+/**
+ * Batch work until #performWork is called.
+ * The work is then executed synchronously or asynchronously
+ *
+ * @author Emmanuel Bernard
+ */
+public class BatchedQueueingProcessor implements QueueingProcessor {
+	private boolean sync;
+	private ExecutorService executorService;
+	private BackendQueueProcessorFactory backendQueueProcessorFactory;
+
+	public BatchedQueueingProcessor(SearchFactory searchFactory,
+									Properties properties) {
+		//default to sync if none defined
+		this.sync = !"async".equalsIgnoreCase( properties.getProperty( Environment.WORKER_PROCESS ) );
+
+		int min = Integer.parseInt(
+				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min", "0" )
+		);
+		int max = Integer.parseInt(
+				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max", "0" ).trim()
+		);
+		if ( max == 0 ) max = Integer.MAX_VALUE;
+		if ( !sync ) {
+			executorService =
+					new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+		}
+		String backend = properties.getProperty( Environment.WORKER_BACKEND );
+		if ( StringHelper.isEmpty( backend ) || "lucene".equalsIgnoreCase( backend ) ) {
+			backendQueueProcessorFactory = new LuceneBackendQueueProcessorFactory();
+		}
+		else if ( "jms".equalsIgnoreCase( backend ) ) {
+			backendQueueProcessorFactory = new JMSBackendQueueProcessorFactory();
+		}
+		else {
+			try {
+				Class processorFactoryClass = ReflectHelper.classForName( backend, BatchedQueueingProcessor.class );
+				backendQueueProcessorFactory = (BackendQueueProcessorFactory) processorFactoryClass.newInstance();
+			}
+			catch (ClassNotFoundException e) {
+				throw new SearchException( "Unable to find processor class: " + backend, e );
+			}
+			catch (IllegalAccessException e) {
+				throw new SearchException( "Unable to instanciate processor class: " + backend, e );
+			}
+			catch (InstantiationException e) {
+				throw new SearchException( "Unable to instanciate processor class: " + backend, e );
+			}
+		}
+		backendQueueProcessorFactory.initialize( properties, searchFactory );
+		searchFactory.setBackendQueueProcessorFactory( backendQueueProcessorFactory );
+	}
+
+	public void add(Work work, List<Work> queue) {
+		//TODO optimize by getting rid of dupe works
+		if ( work instanceof UpdateWork ) {
+			//split in 2 to optimize the process (reader first, writer next
+			queue.add( new DeleteWork( work.getId(), work.getEntity() ) );
+			queue.add( new AddWork( work.getId(), work.getEntity(), work.getDocument() ) );
+		}
+		else {
+			queue.add( work );
+		}
+	}
+
+	//TODO implements parallel batchWorkers (one per Directory)
+	public void performWork(List<Work> queue) {
+		Runnable processor = backendQueueProcessorFactory.getProcessor( queue );
+		if ( sync ) {
+			processor.run();
+		}
+		else {
+			executorService.execute( processor );
+		}
+	}
+
+	public void cancelWork(List<Work> queue) {
+		queue.clear();
+	}
+
+	@Override
+	public void finalize() throws Throwable {
+		super.finalize();
+		//gracefully stop
+		//TODO move to the SF close lifecycle
+		if ( executorService != null && !executorService.isShutdown() ) executorService.shutdown();
+	}
+
+}

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,80 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ExecutorService;
-
-import org.hibernate.search.SearchFactory;
-import org.hibernate.search.backend.WorkQueue;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.UpdateWork;
-import org.hibernate.search.backend.DeleteWork;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.Environment;
-
-/**
- * Batch work until #performWork is called.
- * The work is then executed synchronously or asynchronously
- * 
- * @author Emmanuel Bernard
- */
-public class BatchedWorkQueue implements WorkQueue {
-	private List<Work> queue = new ArrayList<Work>();
-	private boolean sync;
-	private ExecutorService executorService;
-	private Properties properties;
-	private SearchFactory searchFactory;
-
-	public BatchedWorkQueue(SearchFactory searchFactory,
-								Properties properties) {
-		this.searchFactory = searchFactory;
-		this.properties = properties;
-		//default to sync if none defined
-		this.sync = ! "async".equalsIgnoreCase( properties.getProperty( Environment.WORKER_PREFIX + "type") );
-		int min = Integer.parseInt(
-				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min", "0" )
-		);
-		int max = Integer.parseInt(
-				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max", "0" ).trim()
-		);
-		if (max == 0) max = Integer.MAX_VALUE;
-		if ( ! sync) {
-			executorService = new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
-		}
-	}
-
-	public void add(Work work) {
-		//TODO optimize by getting rid of dupe works
-		if ( work instanceof UpdateWork ) {
-			//split in 2 to optimize the process (reader first, writer next
-			queue.add( new DeleteWork( work.getId(), work.getEntity() ) );
-			queue.add( new AddWork( work.getId(), work.getEntity(), work.getDocument() ) );
-		}
-		else {
-			queue.add( work );
-		}
-	}
-
-	//TODO implements parallel batchWorkers (one per Directory)
-	public void performWork() {
-		BatchedQueueWorker batchWorker = new BatchedQueueWorker();
-		batchWorker.initialize( properties, searchFactory );
-		batchWorker.setQueue( queue );
-		if (sync) {
-			batchWorker.run();
-		}
-		else {
-			executorService.execute( batchWorker );
-		}
-	}
-
-	public void cancelWork() {
-		queue.clear();
-	}
-
-}

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,116 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.hibernate.annotations.common.AssertionFailure;
-import org.hibernate.search.engine.DocumentBuilder;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.DeleteWork;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.backend.UpdateWork;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchException;
-
-/**
- * Stateless implementation that perform a work
- *
- * @author Emmanuel Bernard
- */
-public class LuceneWorker {
-	private Workspace workspace;
-	private static Log log = LogFactory.getLog( LuceneWorker.class );
-
-	public LuceneWorker(Workspace workspace) {
-		this.workspace = workspace;
-	}
-
-	public void performWork(Work work) {
-		if ( AddWork.class.isAssignableFrom( work.getClass() ) ) {
-			performWork( (AddWork) work );
-		}
-		else if ( UpdateWork.class.isAssignableFrom( work.getClass() ) ) {
-			performWork( (UpdateWork) work );
-		}
-		else if ( DeleteWork.class.isAssignableFrom( work.getClass() ) ) {
-			performWork( (DeleteWork) work );
-		}
-		else {
-			throw new AssertionFailure( "Unknown work type: " + work.getClass() );
-		}
-	}
-
-	public void performWork(AddWork work) {
-		Class entity = work.getEntity();
-		Serializable id = work.getId();
-		Document document = work.getDocument();
-		add( entity, id, document );
-	}
-
-	private void add(Class entity, Serializable id, Document document) {
-		if ( log.isTraceEnabled() )
-			log.trace( "add to Lucene index: " + entity + "#" + id + ": " + document );
-		IndexWriter writer = workspace.getIndexWriter( entity );
-		try {
-			writer.addDocument( document );
-		}
-		catch (IOException e) {
-			throw new SearchException( "Unable to add to Lucene index: " + entity + "#" + id, e );
-		}
-	}
-
-	public void performWork(UpdateWork work) {
-		Class entity = work.getEntity();
-		Serializable id = work.getId();
-		Document document = work.getDocument();
-		remove( entity, id );
-		add( entity, id, document );
-	}
-
-	public void performWork(DeleteWork work) {
-		Class entity = work.getEntity();
-		Serializable id = work.getId();
-		remove( entity, id );
-	}
-
-	private void remove(Class entity, Serializable id) {
-		log.trace( "remove from Lucene index: " + entity + "#" + id );
-		DocumentBuilder builder = workspace.getDocumentBuilder( entity );
-		Term term = builder.getTerm( id );
-		IndexReader reader = workspace.getIndexReader( entity );
-		TermDocs termDocs = null;
-		try {
-			//TODO is there a faster way?
-			//TODO include TermDocs into the workspace?
-			termDocs = reader.termDocs( term );
-			String entityName = entity.getName();
-			while ( termDocs.next() ) {
-				int docIndex = termDocs.doc();
-				if ( entityName.equals( reader.document( docIndex ).get( DocumentBuilder.CLASS_FIELDNAME ) ) ) {
-					//remove only the one of the right class
-					//loop all to remove all the matches (defensive code)
-					reader.deleteDocument( docIndex );
-				}
-			}
-		}
-		catch (Exception e) {
-			throw new SearchException( "Unable to remove from Lucene index: " + entity + "#" + id, e );
-		}
-		finally {
-			if (termDocs != null) try {
-				termDocs.close();
-			}
-			catch (IOException e) {
-				log.warn( "Unable to close termDocs properly", e);
-			}
-		}
-	}
-}

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,10 +1,12 @@
 //$Id: $
 package org.hibernate.search.backend.impl;
 
+import java.util.List;
+import java.util.ArrayList;
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
 
-import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
 import org.hibernate.search.backend.Work;
 import org.hibernate.search.util.WeakIdentityHashMap;
 
@@ -14,27 +16,21 @@
  * @author Emmanuel Bernard
  */
 public class PostTransactionWorkQueueSynchronization implements Synchronization {
-	private WorkQueue workQueue;
+	private QueueingProcessor queueingProcessor;
 	private boolean consumed;
 	private WeakIdentityHashMap queuePerTransaction;
+	private List<Work> queue = new ArrayList<Work>();
 
 	/**
-	 * out of transaction work
-	 */
-	public PostTransactionWorkQueueSynchronization(WorkQueue workQueue) {
-		this.workQueue = workQueue;
-	}
-
-	/**
 	 * in transaction work
 	 */
-	public PostTransactionWorkQueueSynchronization(WorkQueue workQueue, WeakIdentityHashMap queuePerTransaction) {
-		this(workQueue);
+	public PostTransactionWorkQueueSynchronization(QueueingProcessor queueingProcessor, WeakIdentityHashMap queuePerTransaction) {
+		this.queueingProcessor = queueingProcessor;
 		this.queuePerTransaction = queuePerTransaction;
 	}
 
 	public void add(Work work) {
-		workQueue.add( work );
+		queueingProcessor.add( work, queue );
 	}
 
 	public boolean isConsumed() {
@@ -47,10 +43,10 @@
 	public void afterCompletion(int i) {
 		try {
 			if ( Status.STATUS_COMMITTED == i ) {
-				workQueue.performWork();
+				queueingProcessor.performWork(queue);
 			}
 			else {
-				workQueue.cancelWork();
+				queueingProcessor.cancelWork(queue);
 			}
 		}
 		finally {

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -2,17 +2,15 @@
 package org.hibernate.search.backend.impl;
 
 import java.util.Properties;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.transaction.Status;
+import java.util.List;
+import java.util.ArrayList;
 
 import org.hibernate.search.backend.Worker;
 import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
 import org.hibernate.search.util.WeakIdentityHashMap;
-import org.hibernate.search.engine.DocumentBuilder;
 import org.hibernate.search.SearchFactory;
-import org.hibernate.search.store.DirectoryProvider;
 import org.hibernate.event.EventSource;
 import org.hibernate.Transaction;
 
@@ -27,35 +25,29 @@
  */
 public class TransactionalWorker implements Worker {
 	//not a synchronized map since for a given transaction, we have not concurrent access
-	protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
-	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
-	private Map<Class, DocumentBuilder<Object>> documentBuilders;
-	private Properties properties;
-	private SearchFactory searchFactory;
+	protected WeakIdentityHashMap synchronizationPerTransaction = new WeakIdentityHashMap();
+	private QueueingProcessor queueingProcessor;
 
 	public void performWork(Work work, EventSource session) {
 		if ( session.isTransactionInProgress() ) {
 			Transaction transaction = session.getTransaction();
 			PostTransactionWorkQueueSynchronization txSync = (PostTransactionWorkQueueSynchronization)
-					queuePerTransaction.get( transaction );
+					synchronizationPerTransaction.get( transaction );
 			if ( txSync == null || txSync.isConsumed() ) {
-				WorkQueue workQueue = new BatchedWorkQueue( searchFactory, properties );
-				txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction );
+				txSync = new PostTransactionWorkQueueSynchronization( queueingProcessor, synchronizationPerTransaction );
 				transaction.registerSynchronization( txSync );
-				queuePerTransaction.put(transaction, txSync);
+				synchronizationPerTransaction.put(transaction, txSync);
 			}
 			txSync.add( work );
 		}
 		else {
-			WorkQueue workQueue = new BatchedWorkQueue( searchFactory, properties );
-			PostTransactionWorkQueueSynchronization sync = new PostTransactionWorkQueueSynchronization( workQueue );
-			sync.add( work );
-			sync.afterCompletion( Status.STATUS_COMMITTED );
+			List<Work> queue = new ArrayList<Work>(2); //one work can be split
+			queueingProcessor.add( work, queue );
+			queueingProcessor.performWork( queue );
 		}
 	}
 
 	public void initialize(Properties props, SearchFactory searchFactory) {
-		this.searchFactory = searchFactory;
-		this.properties = props;
+		this.queueingProcessor = new BatchedQueueingProcessor( searchFactory, props );
 	}
 }

Copied: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java (from rev 11171, branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java)
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,51 @@
+//$Id: $
+package org.hibernate.search.backend.impl.jms;
+
+import java.io.Serializable;
+import java.util.List;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+
+import org.hibernate.HibernateException;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class JMSBackendQueueProcessor implements Runnable {
+	private List<Work> queue;
+	private JMSBackendQueueProcessorFactory factory;
+
+	public JMSBackendQueueProcessor(List<Work> queue,
+									JMSBackendQueueProcessorFactory jmsBackendQueueProcessorFactory) {
+		this.queue = queue;
+		this.factory = jmsBackendQueueProcessorFactory;
+	}
+
+	public void run() {
+		factory.prepareJMSTools();
+		QueueConnection cnn;
+		QueueSender sender;
+		QueueSession session;
+		try {
+			cnn = factory.getJMSFactory().createQueueConnection();
+			//TODO make transacted parameterized
+			session = cnn.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
+
+			ObjectMessage message = session.createObjectMessage();
+			message.setObject( (Serializable) this.queue );
+
+			sender = session.createSender( factory.getJmsQueue() );
+			sender.send( message );
+
+			session.close();
+			cnn.close();
+		}
+		catch (JMSException e) {
+			throw new HibernateException( "Unable to send Search work to JMS queue: " + factory.getJmsQueueName(), e );
+		}
+	}
+}

Added: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,107 @@
+//$Id: $
+package org.hibernate.search.backend.impl.jms;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import javax.jms.Queue;
+import javax.jms.QueueConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.hibernate.HibernateException;
+import org.hibernate.search.Environment;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class JMSBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
+	private String jmsQueueName;
+	private String jmsConnectionFactoryName;
+	private static final String JNDI_PREFIX = Environment.WORKER_PREFIX + "jndi.";
+	private Properties properties;
+	private Queue jmsQueue;
+	private QueueConnectionFactory factory;
+
+	public void initialize(Properties props, SearchFactory searchFactory) {
+		this.properties = props;
+		this.jmsConnectionFactoryName = props.getProperty( Environment.WORKER_PREFIX + "jms.connection_factory" );
+		this.jmsQueueName = props.getProperty( Environment.WORKER_PREFIX + "jms.queue" );
+		prepareJMSTools();
+	}
+
+	public Runnable getProcessor(List<Work> queue) {
+		return new JMSBackendQueueProcessor(queue, this);
+	}
+
+
+	public QueueConnectionFactory getJMSFactory() {
+		return factory;
+	}
+
+	public Queue getJmsQueue() {
+		return jmsQueue;
+	}
+
+
+	public String getJmsQueueName() {
+		return jmsQueueName;
+	}
+
+	public void prepareJMSTools() {
+		if (jmsQueue != null && factory != null) return;
+		try {
+			InitialContext initialContext = getInitialContext(properties);
+			jmsQueue = (Queue) initialContext.lookup( jmsQueueName );
+			factory = (QueueConnectionFactory) initialContext.lookup( jmsConnectionFactoryName );
+		}
+		catch (NamingException e) {
+			throw new HibernateException("Unable to lookup Search queue and connection factory", e);
+		}
+	}
+
+	private InitialContext getInitialContext(Properties properties) throws NamingException {
+		Properties jndiProps = getJndiProperties( properties );
+		if (jndiProps.size() == 0) {
+			return new InitialContext(  );
+		}
+		else {
+			return new InitialContext( jndiProps );
+		}
+	}
+
+	public static Properties getJndiProperties(Properties properties) {
+
+		HashSet specialProps = new HashSet();
+		specialProps.add( JNDI_PREFIX + "class" );
+		specialProps.add( JNDI_PREFIX + "url" );
+
+		Iterator iter = properties.keySet().iterator();
+		Properties result = new Properties();
+		while ( iter.hasNext() ) {
+			String prop = (String) iter.next();
+			if ( prop.indexOf(JNDI_PREFIX) > -1 && !specialProps.contains(prop) ) {
+				result.setProperty(
+						prop.substring( JNDI_PREFIX.length()+1 ),
+						properties.getProperty(prop)
+					);
+			}
+		}
+
+		String jndiClass = properties.getProperty(JNDI_PREFIX + "class");
+		String jndiURL = properties.getProperty(JNDI_PREFIX + "url");
+		// we want to be able to just use the defaults,
+		// if JNDI environment properties are not supplied
+		// so don't put null in anywhere
+		if (jndiClass != null) result.put( Context.INITIAL_CONTEXT_FACTORY, jndiClass);
+		if (jndiURL != null) result.put(Context.PROVIDER_URL, jndiURL);
+
+		return result;
+	}
+
+}

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -2,23 +2,20 @@
 package org.hibernate.search.backend.impl.jms;
 
 import java.util.List;
-import java.util.Properties;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.JMSException;
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
 
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.backend.impl.BatchedQueueWorker;
-import org.hibernate.search.util.ContextHelper;
-import org.hibernate.search.SearchFactory;
+import org.apache.commons.logging.LogFactory;
 import org.hibernate.Session;
 import org.hibernate.engine.SessionImplementor;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.util.ContextHelper;
 
 /**
  * @author Emmanuel Bernard
@@ -29,7 +26,7 @@
 	protected abstract Session getSession();
 
 	public void onMessage(Message message) {
-		if (! (message instanceof ObjectMessage) ) {
+		if ( !( message instanceof ObjectMessage ) ) {
 			log.error( "Incorrect message type: " + message.getClass() );
 			return;
 		}
@@ -42,30 +39,29 @@
 			log.error( "Unable to retrieve object from message: " + message.getClass(), e );
 			return;
 		}
-		catch( ClassCastException e ) {
+		catch (ClassCastException e) {
 			log.error( "Illegal object retrieved from message", e );
 			return;
 		}
-		QueueWorker worker = getWorker( queue );
+		Runnable worker = getWorker( queue );
 		worker.run();
 	}
 
-	private QueueWorker getWorker(List<Work> queue) {
+	private Runnable getWorker(List<Work> queue) {
 		//FIXME casting sucks becasue we do not control what get session from
 		SearchFactory factory = ContextHelper.getSearchFactory( (SessionImplementor) getSession() );
-		QueueWorker worker = new BatchedQueueWorker();
-		worker.initialize( new Properties(), factory );
-		worker.setQueue( queue );
-		return worker;
+		return factory.getBackendQueueProcessorFactory().getProcessor( queue );
 	}
 
 	@PostConstruct
 	public void initialize() {
 		//init the source copy process
-		
+		//TODO actually this is probably wrong since this is now part of the DP
 	}
+
 	@PreDestroy
 	public void shutdown() {
 		//stop the source copy process
+		//TODO actually this is probably wrong since this is now part of the DP
 	}
 }

Deleted: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -1,125 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl.jms;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.io.Serializable;
-
-import javax.jms.QueueSender;
-import javax.jms.QueueConnection;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.ObjectMessage;
-import javax.jms.JMSException;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.naming.Context;
-
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchFactory;
-import org.hibernate.search.Environment;
-import org.hibernate.HibernateException;
-
-/**
- * @author Emmanuel Bernard
- */
-public class JMSQueueWorker implements QueueWorker {
-	private List<Work> queue;
-	private String jmsQueueName;
-	private String jmsConnectionFactoryName;
-	private static final String JNDI_PREFIX = Environment.WORKER_PREFIX + "jndi.";
-	private Properties properties;
-	private Queue jmsQueue;
-	private QueueConnectionFactory factory;
-
-	public void run() {
-		resetJMSTools();
-		QueueConnection cnn;
-		QueueSender sender;
-		QueueSession session;
-		try {
-			cnn = factory.createQueueConnection();
-			//TODO make transacted parameterized
-			session = cnn.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
-
-			ObjectMessage message = session.createObjectMessage( );
-			message.setObject( (Serializable) this.queue );
-
-			sender = session.createSender( jmsQueue );
-			sender.send( message );
-
-			session.close();
-			cnn.close();
-		}
-		catch( JMSException e ) {
-			throw new HibernateException("Unable to send Search work to JMS queue: " + jmsQueueName, e);
-		}
-	}
-
-	private InitialContext getInitialContext(Properties properties) throws NamingException {
-		Properties jndiProps = getJndiProperties( properties );
-		if (jndiProps.size() == 0) {
-			return new InitialContext(  );
-		}
-		else {
-			return new InitialContext( jndiProps );
-		}
-	}
-
-	public void initialize(Properties props, SearchFactory searchFactory) {
-		this.properties = props;
-		this.jmsConnectionFactoryName = props.getProperty( Environment.WORKER_PREFIX + "jms.connection_factory" );
-		this.jmsQueueName = props.getProperty( Environment.WORKER_PREFIX + "jms.queue" );
-		resetJMSTools();
-
-	}
-
-	private void resetJMSTools() {
-		if (jmsQueue != null && factory != null) return;
-		try {
-			InitialContext initialContext = getInitialContext(properties);
-			jmsQueue = (Queue) initialContext.lookup( jmsQueueName );
-			factory = (QueueConnectionFactory) initialContext.lookup( jmsConnectionFactoryName );
-		}
-		catch (NamingException e) {
-			throw new HibernateException("Unable to lookup Search queue and connection factory", e);
-		}
-	}
-
-	public void setQueue(List<Work> queue) {
-		this.queue = queue;
-	}
-
-	public static Properties getJndiProperties(Properties properties) {
-
-		HashSet specialProps = new HashSet();
-		specialProps.add( JNDI_PREFIX + "class" );
-		specialProps.add( JNDI_PREFIX + "url" );
-
-		Iterator iter = properties.keySet().iterator();
-		Properties result = new Properties();
-		while ( iter.hasNext() ) {
-			String prop = (String) iter.next();
-			if ( prop.indexOf(JNDI_PREFIX) > -1 && !specialProps.contains(prop) ) {
-				result.setProperty(
-						prop.substring( JNDI_PREFIX.length()+1 ),
-						properties.getProperty(prop)
-					);
-			}
-		}
-
-		String jndiClass = properties.getProperty(JNDI_PREFIX + "class");
-		String jndiURL = properties.getProperty(JNDI_PREFIX + "url");
-		// we want to be able to just use the defaults,
-		// if JNDI environment properties are not supplied
-		// so don't put null in anywhere
-		if (jndiClass != null) result.put( Context.INITIAL_CONTEXT_FACTORY, jndiClass);
-		if (jndiURL != null) result.put(Context.PROVIDER_URL, jndiURL);
-
-		return result;
-	}
-}

Copied: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java (from rev 11171, branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java)
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,69 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.Workspace;
+
+/**
+ * Apply the operations to Lucene directories
+ * avoiding deadlocks
+ *
+ * @author Emmanuel Bernard
+ */
+public class LuceneBackendQueueProcessor implements Runnable {
+	private List<Work> queue;
+	private SearchFactory searchFactory;
+
+	public LuceneBackendQueueProcessor(List<Work> queue, SearchFactory searchFactory) {
+		this.queue = queue;
+		this.searchFactory = searchFactory;
+	}
+
+	public void run() {
+		Workspace workspace;
+		LuceneWorker worker;
+		workspace = new Workspace( searchFactory );
+		worker = new LuceneWorker( workspace );
+		try {
+			deadlockFreeQueue(queue, workspace);
+			for ( Work work : queue ) {
+				worker.performWork( work );
+			}
+		}
+		finally {
+			workspace.clean();
+			queue.clear();
+		}
+	}
+
+	/**
+	 * one must lock the directory providers in the exact same order to avoid
+	 * dead lock between concurrent threads or processes
+	 * To achieve that, the work will be done per directory provider
+	 */
+	private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
+		Collections.sort( queue, new Comparator<Work>() {
+			public int compare(Work o1, Work o2) {
+				long h1 = getWorkHashCode( o1, workspace );
+				long h2 = getWorkHashCode( o2, workspace );
+				return h1 < h2 ?
+						-1 :
+						h1 == h2 ?
+							0 :
+							1;
+			}
+		} );
+	}
+
+	private long getWorkHashCode(Work work, Workspace workspace) {
+		long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
+		if (work instanceof AddWork ) h+=1; //addwork after deleteWork
+		return h;
+	}
+}

Added: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.Properties;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class LuceneBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
+	private SearchFactory searchFactory;
+
+	public void initialize(Properties props, SearchFactory searchFactory) {
+		this.searchFactory = searchFactory;
+	}
+
+	public Runnable getProcessor(List<Work> queue) {
+		return new LuceneBackendQueueProcessor( queue, searchFactory );
+	}
+}

Copied: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java (from rev 11171, branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java)
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java	                        (rev 0)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -0,0 +1,116 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.hibernate.annotations.common.AssertionFailure;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.DeleteWork;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.UpdateWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.SearchException;
+
+/**
+ * Stateless implementation that perform a work
+ *
+ * @author Emmanuel Bernard
+ */
+public class LuceneWorker {
+	private Workspace workspace;
+	private static Log log = LogFactory.getLog( LuceneWorker.class );
+
+	public LuceneWorker(Workspace workspace) {
+		this.workspace = workspace;
+	}
+
+	public void performWork(Work work) {
+		if ( AddWork.class.isAssignableFrom( work.getClass() ) ) {
+			performWork( (AddWork) work );
+		}
+		else if ( UpdateWork.class.isAssignableFrom( work.getClass() ) ) {
+			performWork( (UpdateWork) work );
+		}
+		else if ( DeleteWork.class.isAssignableFrom( work.getClass() ) ) {
+			performWork( (DeleteWork) work );
+		}
+		else {
+			throw new AssertionFailure( "Unknown work type: " + work.getClass() );
+		}
+	}
+
+	public void performWork(AddWork work) {
+		Class entity = work.getEntity();
+		Serializable id = work.getId();
+		Document document = work.getDocument();
+		add( entity, id, document );
+	}
+
+	private void add(Class entity, Serializable id, Document document) {
+		if ( log.isTraceEnabled() )
+			log.trace( "add to Lucene index: " + entity + "#" + id + ": " + document );
+		IndexWriter writer = workspace.getIndexWriter( entity );
+		try {
+			writer.addDocument( document );
+		}
+		catch (IOException e) {
+			throw new SearchException( "Unable to add to Lucene index: " + entity + "#" + id, e );
+		}
+	}
+
+	public void performWork(UpdateWork work) {
+		Class entity = work.getEntity();
+		Serializable id = work.getId();
+		Document document = work.getDocument();
+		remove( entity, id );
+		add( entity, id, document );
+	}
+
+	public void performWork(DeleteWork work) {
+		Class entity = work.getEntity();
+		Serializable id = work.getId();
+		remove( entity, id );
+	}
+
+	private void remove(Class entity, Serializable id) {
+		log.trace( "remove from Lucene index: " + entity + "#" + id );
+		DocumentBuilder builder = workspace.getDocumentBuilder( entity );
+		Term term = builder.getTerm( id );
+		IndexReader reader = workspace.getIndexReader( entity );
+		TermDocs termDocs = null;
+		try {
+			//TODO is there a faster way?
+			//TODO include TermDocs into the workspace?
+			termDocs = reader.termDocs( term );
+			String entityName = entity.getName();
+			while ( termDocs.next() ) {
+				int docIndex = termDocs.doc();
+				if ( entityName.equals( reader.document( docIndex ).get( DocumentBuilder.CLASS_FIELDNAME ) ) ) {
+					//remove only the one of the right class
+					//loop all to remove all the matches (defensive code)
+					reader.deleteDocument( docIndex );
+				}
+			}
+		}
+		catch (Exception e) {
+			throw new SearchException( "Unable to remove from Lucene index: " + entity + "#" + id, e );
+		}
+		finally {
+			if (termDocs != null) try {
+				termDocs.close();
+			}
+			catch (IOException e) {
+				log.warn( "Unable to close termDocs properly", e);
+			}
+		}
+	}
+}

Modified: branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -48,8 +48,8 @@
 import org.hibernate.search.SearchFactory;
 import org.hibernate.search.backend.UpdateWork;
 import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.WorkQueue;
-import org.hibernate.search.backend.impl.BatchedWorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
 import org.hibernate.search.backend.impl.PostTransactionWorkQueueSynchronization;
 import org.hibernate.search.FullTextSession;
 import org.hibernate.stat.SessionStatistics;
@@ -95,7 +95,7 @@
 			Serializable id = session.getIdentifier( entity );
 			Document doc = builder.getDocument( entity, id );
 			UpdateWork work = new UpdateWork( id, entity.getClass(), doc );
-			processWork( work, searchFactory );
+			searchFactory.getWorker().performWork( work, session );
 		}
 		//TODO
 		//need to add elements in a queue kept at the Session level
@@ -106,30 +106,6 @@
 		// this is an open discussion
 	}
 
-	private void processWork(Work work, SearchFactory searchFactory) {
-		if ( session.isTransactionInProgress() ) {
-			if ( postTransactionWorkQueueSynch == null || postTransactionWorkQueueSynch.isConsumed() ) {
-				postTransactionWorkQueueSynch = createWorkQueueSync(searchFactory);
-				session.getTransaction().registerSynchronization( postTransactionWorkQueueSynch );
-			}
-			postTransactionWorkQueueSynch.add( work );
-		}
-		else {
-			//no transaction work right away
-			PostTransactionWorkQueueSynchronization sync =
-					createWorkQueueSync( searchFactory );
-			sync.add( work );
-			sync.afterCompletion( Status.STATUS_COMMITTED );
-		}
-	}
-
-	private PostTransactionWorkQueueSynchronization createWorkQueueSync(
-			SearchFactory searchFactory) {
-		//FIXME should be harmonized with the WorkerFactory?
-		WorkQueue workQueue = new BatchedWorkQueue( searchFactory, new Properties() );
-		return new PostTransactionWorkQueueSynchronization( workQueue );
-	}
-
 	public Query createSQLQuery(String sql, String returnAlias, Class returnClass) {
 		return session.createSQLQuery( sql, returnAlias, returnClass );
 	}

Modified: branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -52,7 +52,7 @@
 
 	public void testSynchronize() throws Exception {
 		File src =  new File("./filehelpersrc");
-		File dest =  new File("./filehelpertest");
+		File dest =  new File("./filehelperdest");
 		FileHelper.synchronize( src, dest, true );
 		File test = new File(dest, "b");
 		assertTrue( test.exists() );

Modified: branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -18,8 +18,8 @@
 	protected void configure(Configuration cfg) {
 		cfg.setProperty( "hibernate.search.default.directory_provider", RAMDirectoryProvider.class.getName() );
 		cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
-		cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
-		cfg.setProperty( Environment.WORKER_PREFIX + "type", "async" );
+		cfg.setProperty( Environment.WORKER_SCOPE, "transaction" );
+		cfg.setProperty( Environment.WORKER_PROCESS, "async" );
 		cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.min", "1" );
 		cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.max", "10" );
 		FullTextIndexEventListener del = new FullTextIndexEventListener();

Modified: branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -18,8 +18,8 @@
 	protected void configure(Configuration cfg) {
 		cfg.setProperty( "hibernate.search.default.directory_provider", RAMDirectoryProvider.class.getName() );
 		cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
-		cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
-		cfg.setProperty( Environment.WORKER_PREFIX + "type", "sync" );
+		cfg.setProperty( Environment.WORKER_SCOPE, "transaction" );
+		cfg.setProperty( Environment.WORKER_PREFIX, "sync" );
 		FullTextIndexEventListener del = new FullTextIndexEventListener();
 		cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del} );
 		cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del} );

Modified: branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
===================================================================
--- branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java	2007-02-22 20:07:09 UTC (rev 11232)
+++ branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java	2007-02-23 03:45:39 UTC (rev 11233)
@@ -27,11 +27,11 @@
  */
 public class WorkerTestCase extends SearchTestCase {
 
-   protected void setUp() throws Exception {
+	protected void setUp() throws Exception {
 		File sub = getBaseIndexDir();
 		sub.mkdir();
 		File[] files = sub.listFiles();
-		for (File file : files) {
+		for ( File file : files ) {
 			if ( file.isDirectory() ) {
 				delete( file );
 			}
@@ -64,22 +64,23 @@
 		}
 	}
 
-   public void testConcurrency() throws Exception {
-      int nThreads = 15;
-      ExecutorService es = Executors.newFixedThreadPool( nThreads );
+	public void testConcurrency() throws Exception {
+		int nThreads = 15;
+		ExecutorService es = Executors.newFixedThreadPool( nThreads );
 		Work work = new Work( getSessions() );
 		ReverseWork reverseWork = new ReverseWork( getSessions() );
-      long start = System.currentTimeMillis();
-      int iteration = 200;
-      for (int i = 0 ; i < iteration; i++) {
+		long start = System.currentTimeMillis();
+		int iteration = 100;
+		for ( int i = 0; i < iteration; i++ ) {
 			es.execute( work );
 			es.execute( reverseWork );
 		}
-		while(work.count < 199) {
+		while ( work.count < iteration - 1 ) {
 			Thread.sleep( 20 );
 		}
-      System.out.println( 3*iteration + " iterations (4 tx per iteration) in " + nThreads + " threads: " + (System.currentTimeMillis() - start) );
-   }
+		System.out.println( iteration + " iterations (8 tx per iteration) in " + nThreads + " threads: " + ( System
+				.currentTimeMillis() - start ) );
+	}
 
 	protected class Work implements Runnable {
 		private SessionFactory sf;
@@ -90,7 +91,7 @@
 		}
 
 		public void run() {
-			Session s = sf.openSession( );
+			Session s = sf.openSession();
 			Transaction tx = s.beginTransaction();
 			Employee ee = new Employee();
 			ee.setName( "Emmanuel" );
@@ -101,7 +102,7 @@
 			tx.commit();
 			s.close();
 
-			s = sf.openSession( );
+			s = sf.openSession();
 			tx = s.beginTransaction();
 			ee = (Employee) s.get( Employee.class, ee.getId() );
 			ee.setName( "Emmanuel2" );
@@ -110,25 +111,30 @@
 			tx.commit();
 			s.close();
 
-         s = sf.openSession( );
+//			try {
+//				Thread.sleep( 50 );
+//			}
+//			catch (InterruptedException e) {
+//				e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+//			}
+
+			s = sf.openSession();
 			tx = s.beginTransaction();
-			FullTextSession fts = new FullTextSessionImpl(s);
-         QueryParser parser = new QueryParser( "id", new StopAnalyzer() );
-         Query query;
-         try
-         {
-            query = parser.parse( "name:emmanuel2" );
-         }
-         catch (ParseException e)
-         {
-            throw new RuntimeException(e);
-         }
-         boolean results = fts.createFullTextQuery( query ).list().size() > 0;
-         if (!results) throw new RuntimeException("No results!");
-         tx.commit();
+			FullTextSession fts = new FullTextSessionImpl( s );
+			QueryParser parser = new QueryParser( "id", new StopAnalyzer() );
+			Query query;
+			try {
+				query = parser.parse( "name:emmanuel2" );
+			}
+			catch (ParseException e) {
+				throw new RuntimeException( e );
+			}
+			boolean results = fts.createFullTextQuery( query ).list().size() > 0;
+			//if ( !results ) throw new RuntimeException( "No results!" );
+			tx.commit();
 			s.close();
 
-         s = sf.openSession( );
+			s = sf.openSession();
 			tx = s.beginTransaction();
 			ee = (Employee) s.get( Employee.class, ee.getId() );
 			s.delete( ee );
@@ -148,7 +154,7 @@
 		}
 
 		public void run() {
-			Session s = sf.openSession( );
+			Session s = sf.openSession();
 			Transaction tx = s.beginTransaction();
 			Employer er = new Employer();
 			er.setName( "RH" );
@@ -159,7 +165,7 @@
 			tx.commit();
 			s.close();
 
-			s = sf.openSession( );
+			s = sf.openSession();
 			tx = s.beginTransaction();
 			er = (Employer) s.get( Employer.class, er.getId() );
 			er.setName( "RH2" );
@@ -168,7 +174,7 @@
 			tx.commit();
 			s.close();
 
-			s = sf.openSession( );
+			s = sf.openSession();
 			tx = s.beginTransaction();
 			er = (Employer) s.get( Employer.class, er.getId() );
 			s.delete( er );
@@ -179,7 +185,7 @@
 		}
 	}
 
-   protected void configure(org.hibernate.cfg.Configuration cfg) {
+	protected void configure(org.hibernate.cfg.Configuration cfg) {
 		File sub = getBaseIndexDir();
 		cfg.setProperty( "hibernate.search.default.indexBase", sub.getAbsolutePath() );
 		cfg.setProperty( "hibernate.search.Clock.directory_provider", FSDirectoryProvider.class.getName() );
@@ -190,8 +196,8 @@
 		cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del} );
 	}
 
-   protected Class[] getMappings() {
-		return new Class[] {
+	protected Class[] getMappings() {
+		return new Class[]{
 				Employee.class,
 				Employer.class
 		};




More information about the hibernate-commits mailing list