[hibernate-commits] Hibernate SVN: r11017 - in branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search: . backend backend/impl event impl

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Sat Jan 6 12:13:07 EST 2007


Author: epbernard
Date: 2007-01-06 12:12:57 -0500 (Sat, 06 Jan 2007)
New Revision: 11017

Added:
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
Modified:
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
Log:
ANN-519 ANN-520

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -14,4 +14,7 @@
 	 * Lucene analyser
 	 */
 	public static final String ANALYZER_CLASS = "hibernate.search.analyzer";
+
+	public static final String WORKER_PREFIX = "hibernate.search.worker.";
+	public static final String WORKER_IMPL = WORKER_PREFIX + "impl";
 }

Added: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.event.AbstractEvent;
+import org.hibernate.event.EventSource;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+
+/**
+ * Perform work for a given session. This implementation has to be multi threaded
+ * @author Emmanuel Bernard
+ */
+//TODO merge worker and workQueue to do a list of workers through work delegation
+public interface Worker {
+	void performWork(Work work, EventSource session);
+
+	void initialize(Properties props, Map<Class, DocumentBuilder<Object>> documentBuilders,
+					Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders);
+}

Added: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -0,0 +1,73 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.backend.impl.TransactionalWorker;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.util.StringHelper;
+import org.hibernate.util.ReflectHelper;
+import org.hibernate.AnnotationException;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class WorkerFactory {
+	private Map<Class, DocumentBuilder<Object>> documentBuilders;
+	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+	private Configuration cfg;
+
+	public void configure(Configuration cfg,
+			Map<Class, DocumentBuilder<Object>> documentBuilders,
+			Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+		this.documentBuilders = documentBuilders;
+		this.lockableDirectoryProviders = lockableDirectoryProviders;
+		this.cfg = cfg;
+	}
+
+	private static Properties getProperties(Configuration cfg) {
+		Properties props = cfg.getProperties();
+		Properties workerProperties = new Properties();
+		for ( Map.Entry entry : props.entrySet() ) {
+			String key = (String) entry.getKey();
+			if ( key.startsWith( Environment.WORKER_PREFIX ) ) {
+				workerProperties.setProperty( key.substring( Environment.WORKER_PREFIX.length() ), (String) entry.getValue() );
+			}
+		}
+		return workerProperties;
+	}
+
+	public Worker createWorker() {
+		Properties props = getProperties( cfg );
+		String impl = props.getProperty( Environment.WORKER_IMPL );
+		Worker worker;
+		if ( StringHelper.isEmpty( impl ) ) {
+			worker = new TransactionalWorker();
+		}
+		else if ( "transaction".equalsIgnoreCase( impl ) ) {
+			worker = new TransactionalWorker();
+		}
+		else {
+			try {
+				Class workerClass = ReflectHelper.classForName( impl, WorkerFactory.class );
+				worker = (Worker) workerClass.newInstance();
+			}
+			catch (ClassNotFoundException e) {
+				throw new AnnotationException("Unable to find worker class: " + impl, e );
+			}
+			catch (IllegalAccessException e) {
+				throw new AnnotationException("Unable to instanciate worker class: " + impl, e );
+			}
+			catch (InstantiationException e) {
+				throw new AnnotationException("Unable to instanciate worker class: " + impl, e );
+			}
+		}
+		worker.initialize( props, documentBuilders, lockableDirectoryProviders );
+		return worker;
+	}
+}

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -91,7 +91,7 @@
 	}
 
 	private void cleanUp(HibernateException originalException) {
-		//release all readers and writers, then reelase locks
+		//release all readers and writers, then release locks
 		HibernateException raisedException = originalException;
 		for ( IndexReader reader : readers.values() ) {
 			try {

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -17,17 +17,22 @@
 import org.hibernate.search.backend.AddWork;
 
 /**
+ * Batch work until #performWork is called.
+ * The work is then executed synchronously or asynchronously
+ * 
  * @author Emmanuel Bernard
  */
 public class BatchLuceneWorkQueue implements WorkQueue {
 	private Workspace workspace;
 	private LuceneWorker worker;
 	private List<Work> queue = new ArrayList<Work>();
+	private boolean sync;
 
 	public BatchLuceneWorkQueue(Map<Class, DocumentBuilder<Object>> documentBuilders,
-					 Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+								Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders, boolean sync) {
 		workspace = new Workspace( documentBuilders, lockableDirectoryProviders );
 		worker = new LuceneWorker( workspace );
+		this.sync = sync;
 	}
 
 	public void add(Work work) {
@@ -42,24 +47,49 @@
 		}
 	}
 
+	//TODO implements parallel batchWorkers (one per Directory)
 	public void performWork() {
-		try {
-			//use of index reader
-			for ( Work work : queue ) {
-				if ( work instanceof DeleteWork ) worker.performWork( work );
-			}
-			workspace.clean(); //close readers
-			for ( Work work : queue ) {
-				if ( work instanceof AddWork ) worker.performWork( work );
-			}
+		BatchWorker batchWorker = new BatchWorker( queue, workspace, worker );
+		if (sync) {
+			batchWorker.run();
 		}
-		finally {
-			workspace.clean();
-			queue.clear();
+		else {
+			//TODO pool threads?
+			Thread thread = new Thread(batchWorker);
+			thread.start();
 		}
 	}
 
 	public void cancelWork() {
 		queue.clear();
 	}
+
+	private class BatchWorker implements Runnable {
+		private List<Work> queue;
+		private Workspace workspace;
+		private LuceneWorker worker;
+
+		public BatchWorker(List<Work> queue, Workspace workspace, LuceneWorker worker) {
+			this.queue = queue;
+			this.workspace = workspace;
+			this.worker = worker;
+		}
+
+		public void run() {
+			try {
+				//use of index reader
+				for ( Work work : queue ) {
+					if ( work instanceof DeleteWork ) worker.performWork( work );
+				}
+				workspace.clean(); //close readers
+				for ( Work work : queue ) {
+					if ( work instanceof AddWork ) worker.performWork( work );
+				}
+			}
+			finally {
+				workspace.clean();
+				queue.clear();
+			}
+		}
+	}
 }

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -18,10 +18,16 @@
 	private boolean consumed;
 	private WeakIdentityHashMap queuePerTransaction;
 
+	/**
+	 * out of transaction work
+	 */
 	public PostTransactionWorkQueueSynchronization(WorkQueue workQueue) {
 		this.workQueue = workQueue;
 	}
 
+	/**
+	 * in transaction work
+	 */
 	public PostTransactionWorkQueueSynchronization(WorkQueue workQueue, WeakIdentityHashMap queuePerTransaction) {
 		this(workQueue);
 		this.queuePerTransaction = queuePerTransaction;

Added: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -0,0 +1,63 @@
+//$Id: $
+package org.hibernate.search.backend.impl;
+
+import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.Status;
+
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.util.WeakIdentityHashMap;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.event.EventSource;
+import org.hibernate.Transaction;
+
+/**
+ * Queue works per transaction.
+ * If out of transaction, the work is executed right away
+ *
+ * When <code>hibernate.search.worker.type</code> is set to <code>async</code>
+ * the work is done in a  
+ *
+ * @author Emmanuel Bernard
+ */
+public class TransactionalWorker implements Worker {
+	//not a synchronized map since for a given transaction, we have not concurrent access
+	protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
+	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+	private Map<Class, DocumentBuilder<Object>> documentBuilders;
+	private boolean sync;
+
+	public void performWork(Work work, EventSource session) {
+		if ( session.isTransactionInProgress() ) {
+			Transaction transaction = session.getTransaction();
+			PostTransactionWorkQueueSynchronization txSync = (PostTransactionWorkQueueSynchronization)
+					queuePerTransaction.get( transaction );
+			if ( txSync == null || txSync.isConsumed() ) {
+				WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders, sync );
+				txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction );
+				transaction.registerSynchronization( txSync );
+				queuePerTransaction.put(transaction, txSync);
+			}
+			txSync.add( work );
+		}
+		else {
+			WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders, sync );
+			PostTransactionWorkQueueSynchronization sync = new PostTransactionWorkQueueSynchronization( workQueue );
+			sync.add( work );
+			sync.afterCompletion( Status.STATUS_COMMITTED );
+		}
+	}
+
+	public void initialize(Properties props, Map<Class, DocumentBuilder<Object>> documentBuilders,
+						   Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+		this.documentBuilders = documentBuilders;
+		this.lockableDirectoryProviders = lockableDirectoryProviders;
+		//default to sync if none defined
+		this.sync = ! "async".equalsIgnoreCase( props.getProperty( Environment.WORKER_PREFIX + "type") );
+	}
+}

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -6,7 +6,6 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.WeakHashMap;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.transaction.Status;
 
@@ -35,6 +34,8 @@
 import org.hibernate.search.backend.UpdateWork;
 import org.hibernate.search.backend.Work;
 import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.backend.WorkerFactory;
 import org.hibernate.search.backend.impl.BatchLuceneWorkQueue;
 import org.hibernate.search.backend.impl.PostTransactionWorkQueueSynchronization;
 import org.hibernate.search.engine.DocumentBuilder;
@@ -58,9 +59,9 @@
 public class FullTextIndexEventListener implements PostDeleteEventListener, PostInsertEventListener,
 		PostUpdateEventListener, Initializable {
 	protected ReflectionManager reflectionManager;
-	//not a synchronized map since for a given transaction, we have not concurrent access
-	protected WeakIdentityHashMap queuePerTransaction;
 
+	protected Worker worker;
+
 	//FIXME keeping this here is a bad decision since you might want to search indexes wo maintain it
 	@Deprecated
 	public Map<Class, DocumentBuilder<Object>> getDocumentBuilders() {
@@ -80,7 +81,6 @@
 		if ( initialized ) return;
 		//yuk
 		reflectionManager = ( (AnnotationConfiguration) cfg ).createExtendedMappings().getReflectionManager();
-		queuePerTransaction = new WeakIdentityHashMap();
 
 		Class analyzerClass;
 		String analyzerClassName = cfg.getProperty( Environment.ANALYZER_CLASS );
@@ -136,6 +136,9 @@
 		for ( DocumentBuilder builder : documentBuilders.values() ) {
 			builder.postInitialize( indexedClasses );
 		}
+		WorkerFactory workerFactory = new WorkerFactory();
+		workerFactory.configure( cfg, documentBuilders, lockableDirectoryProviders );
+		worker = workerFactory.createWorker();
 		initialized = true;
 	}
 
@@ -169,24 +172,7 @@
 	}
 
 	private void processWork(Work work, AbstractEvent event) {
-		if ( event.getSession().isTransactionInProgress() ) {
-			Transaction transaction = event.getSession().getTransaction();
-			PostTransactionWorkQueueSynchronization sync = (PostTransactionWorkQueueSynchronization)
-					queuePerTransaction.get( transaction );
-			if ( sync == null || sync.isConsumed() ) {
-				WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders );
-				sync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction );
-				transaction.registerSynchronization( sync );
-				queuePerTransaction.put(transaction, sync);
-			}
-			sync.add( work );
-		}
-		else {
-			WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders );
-			PostTransactionWorkQueueSynchronization sync = new PostTransactionWorkQueueSynchronization( workQueue );
-			sync.add( work );
-			sync.afterCompletion( Status.STATUS_COMMITTED );
-		}
+		worker.performWork( work, event.getSession() );
 	}
 
 	public Map<DirectoryProvider, ReentrantLock> getLockableDirectoryProviders() {

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-01-04 10:42:06 UTC (rev 11016)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-01-06 17:12:57 UTC (rev 11017)
@@ -112,7 +112,8 @@
 	private PostTransactionWorkQueueSynchronization createWorkQueueSync(
 			Map<Class, DocumentBuilder<Object>> documentBuilders,
 			Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
-		WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders );
+		//FIXME should be harmonized with the WorkerFactory?
+		WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders, lockableDirectoryProviders, true );
 		return new PostTransactionWorkQueueSynchronization( workQueue );
 	}
 




More information about the hibernate-commits mailing list