Author: epbernard
Date: 2007-01-06 12:12:57 -0500 (Sat, 06 Jan 2007)
New Revision: 11017
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
Log:
ANN-519 ANN-520
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/Environment.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -14,4 +14,7 @@
* Lucene analyser
*/
public static final String ANALYZER_CLASS = "hibernate.search.analyzer";
+
+ public static final String WORKER_PREFIX = "hibernate.search.worker.";
+ public static final String WORKER_IMPL = WORKER_PREFIX + "impl";
}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.event.AbstractEvent;
+import org.hibernate.event.EventSource;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+
+/**
+ * Perform work for a given session. This implementation has to be multi threaded
+ * @author Emmanuel Bernard
+ */
+//TODO merge worker and workQueue to do a list of workers through work delegation
+public interface Worker {
+ void performWork(Work work, EventSource session);
+
+ void initialize(Properties props, Map<Class, DocumentBuilder<Object>>
documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders);
+}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkerFactory.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -0,0 +1,73 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.backend.impl.TransactionalWorker;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.util.StringHelper;
+import org.hibernate.util.ReflectHelper;
+import org.hibernate.AnnotationException;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class WorkerFactory {
+ private Map<Class, DocumentBuilder<Object>> documentBuilders;
+ private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private Configuration cfg;
+
+ public void configure(Configuration cfg,
+ Map<Class, DocumentBuilder<Object>> documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ this.documentBuilders = documentBuilders;
+ this.lockableDirectoryProviders = lockableDirectoryProviders;
+ this.cfg = cfg;
+ }
+
+ private static Properties getProperties(Configuration cfg) {
+ Properties props = cfg.getProperties();
+ Properties workerProperties = new Properties();
+ for ( Map.Entry entry : props.entrySet() ) {
+ String key = (String) entry.getKey();
+ if ( key.startsWith( Environment.WORKER_PREFIX ) ) {
+ workerProperties.setProperty( key.substring( Environment.WORKER_PREFIX.length() ),
(String) entry.getValue() );
+ }
+ }
+ return workerProperties;
+ }
+
+ public Worker createWorker() {
+ Properties props = getProperties( cfg );
+ String impl = props.getProperty( Environment.WORKER_IMPL );
+ Worker worker;
+ if ( StringHelper.isEmpty( impl ) ) {
+ worker = new TransactionalWorker();
+ }
+ else if ( "transaction".equalsIgnoreCase( impl ) ) {
+ worker = new TransactionalWorker();
+ }
+ else {
+ try {
+ Class workerClass = ReflectHelper.classForName( impl, WorkerFactory.class );
+ worker = (Worker) workerClass.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new AnnotationException("Unable to find worker class: " + impl, e );
+ }
+ catch (IllegalAccessException e) {
+ throw new AnnotationException("Unable to instanciate worker class: " +
impl, e );
+ }
+ catch (InstantiationException e) {
+ throw new AnnotationException("Unable to instanciate worker class: " +
impl, e );
+ }
+ }
+ worker.initialize( props, documentBuilders, lockableDirectoryProviders );
+ return worker;
+ }
+}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Workspace.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -91,7 +91,7 @@
}
private void cleanUp(HibernateException originalException) {
- //release all readers and writers, then reelase locks
+ //release all readers and writers, then release locks
HibernateException raisedException = originalException;
for ( IndexReader reader : readers.values() ) {
try {
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchLuceneWorkQueue.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -17,17 +17,22 @@
import org.hibernate.search.backend.AddWork;
/**
+ * Batch work until #performWork is called.
+ * The work is then executed synchronously or asynchronously
+ *
* @author Emmanuel Bernard
*/
public class BatchLuceneWorkQueue implements WorkQueue {
private Workspace workspace;
private LuceneWorker worker;
private List<Work> queue = new ArrayList<Work>();
+ private boolean sync;
public BatchLuceneWorkQueue(Map<Class, DocumentBuilder<Object>>
documentBuilders,
- Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders, boolean
sync) {
workspace = new Workspace( documentBuilders, lockableDirectoryProviders );
worker = new LuceneWorker( workspace );
+ this.sync = sync;
}
public void add(Work work) {
@@ -42,24 +47,49 @@
}
}
+ //TODO implements parallel batchWorkers (one per Directory)
public void performWork() {
- try {
- //use of index reader
- for ( Work work : queue ) {
- if ( work instanceof DeleteWork ) worker.performWork( work );
- }
- workspace.clean(); //close readers
- for ( Work work : queue ) {
- if ( work instanceof AddWork ) worker.performWork( work );
- }
+ BatchWorker batchWorker = new BatchWorker( queue, workspace, worker );
+ if (sync) {
+ batchWorker.run();
}
- finally {
- workspace.clean();
- queue.clear();
+ else {
+ //TODO pool threads?
+ Thread thread = new Thread(batchWorker);
+ thread.start();
}
}
public void cancelWork() {
queue.clear();
}
+
+ private class BatchWorker implements Runnable {
+ private List<Work> queue;
+ private Workspace workspace;
+ private LuceneWorker worker;
+
+ public BatchWorker(List<Work> queue, Workspace workspace, LuceneWorker worker) {
+ this.queue = queue;
+ this.workspace = workspace;
+ this.worker = worker;
+ }
+
+ public void run() {
+ try {
+ //use of index reader
+ for ( Work work : queue ) {
+ if ( work instanceof DeleteWork ) worker.performWork( work );
+ }
+ workspace.clean(); //close readers
+ for ( Work work : queue ) {
+ if ( work instanceof AddWork ) worker.performWork( work );
+ }
+ }
+ finally {
+ workspace.clean();
+ queue.clear();
+ }
+ }
+ }
}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -18,10 +18,16 @@
private boolean consumed;
private WeakIdentityHashMap queuePerTransaction;
+ /**
+ * out of transaction work
+ */
public PostTransactionWorkQueueSynchronization(WorkQueue workQueue) {
this.workQueue = workQueue;
}
+ /**
+ * in transaction work
+ */
public PostTransactionWorkQueueSynchronization(WorkQueue workQueue, WeakIdentityHashMap
queuePerTransaction) {
this(workQueue);
this.queuePerTransaction = queuePerTransaction;
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -0,0 +1,63 @@
+//$Id: $
+package org.hibernate.search.backend.impl;
+
+import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.Status;
+
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.util.WeakIdentityHashMap;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.event.EventSource;
+import org.hibernate.Transaction;
+
+/**
+ * Queue works per transaction.
+ * If out of transaction, the work is executed right away
+ *
+ * When <code>hibernate.search.worker.type</code> is set to
<code>async</code>
+ * the work is done in a
+ *
+ * @author Emmanuel Bernard
+ */
+public class TransactionalWorker implements Worker {
+ //not a synchronized map since for a given transaction, we have not concurrent access
+ protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
+ private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private Map<Class, DocumentBuilder<Object>> documentBuilders;
+ private boolean sync;
+
+ public void performWork(Work work, EventSource session) {
+ if ( session.isTransactionInProgress() ) {
+ Transaction transaction = session.getTransaction();
+ PostTransactionWorkQueueSynchronization txSync =
(PostTransactionWorkQueueSynchronization)
+ queuePerTransaction.get( transaction );
+ if ( txSync == null || txSync.isConsumed() ) {
+ WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders, sync );
+ txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction
);
+ transaction.registerSynchronization( txSync );
+ queuePerTransaction.put(transaction, txSync);
+ }
+ txSync.add( work );
+ }
+ else {
+ WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders, sync );
+ PostTransactionWorkQueueSynchronization sync = new
PostTransactionWorkQueueSynchronization( workQueue );
+ sync.add( work );
+ sync.afterCompletion( Status.STATUS_COMMITTED );
+ }
+ }
+
+ public void initialize(Properties props, Map<Class, DocumentBuilder<Object>>
documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ this.documentBuilders = documentBuilders;
+ this.lockableDirectoryProviders = lockableDirectoryProviders;
+ //default to sync if none defined
+ this.sync = ! "async".equalsIgnoreCase( props.getProperty(
Environment.WORKER_PREFIX + "type") );
+ }
+}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/event/FullTextIndexEventListener.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -6,7 +6,6 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.WeakHashMap;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
@@ -35,6 +34,8 @@
import org.hibernate.search.backend.UpdateWork;
import org.hibernate.search.backend.Work;
import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.backend.WorkerFactory;
import org.hibernate.search.backend.impl.BatchLuceneWorkQueue;
import org.hibernate.search.backend.impl.PostTransactionWorkQueueSynchronization;
import org.hibernate.search.engine.DocumentBuilder;
@@ -58,9 +59,9 @@
public class FullTextIndexEventListener implements PostDeleteEventListener,
PostInsertEventListener,
PostUpdateEventListener, Initializable {
protected ReflectionManager reflectionManager;
- //not a synchronized map since for a given transaction, we have not concurrent access
- protected WeakIdentityHashMap queuePerTransaction;
+ protected Worker worker;
+
//FIXME keeping this here is a bad decision since you might want to search indexes wo
maintain it
@Deprecated
public Map<Class, DocumentBuilder<Object>> getDocumentBuilders() {
@@ -80,7 +81,6 @@
if ( initialized ) return;
//yuk
reflectionManager = ( (AnnotationConfiguration) cfg
).createExtendedMappings().getReflectionManager();
- queuePerTransaction = new WeakIdentityHashMap();
Class analyzerClass;
String analyzerClassName = cfg.getProperty( Environment.ANALYZER_CLASS );
@@ -136,6 +136,9 @@
for ( DocumentBuilder builder : documentBuilders.values() ) {
builder.postInitialize( indexedClasses );
}
+ WorkerFactory workerFactory = new WorkerFactory();
+ workerFactory.configure( cfg, documentBuilders, lockableDirectoryProviders );
+ worker = workerFactory.createWorker();
initialized = true;
}
@@ -169,24 +172,7 @@
}
private void processWork(Work work, AbstractEvent event) {
- if ( event.getSession().isTransactionInProgress() ) {
- Transaction transaction = event.getSession().getTransaction();
- PostTransactionWorkQueueSynchronization sync =
(PostTransactionWorkQueueSynchronization)
- queuePerTransaction.get( transaction );
- if ( sync == null || sync.isConsumed() ) {
- WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders );
- sync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction
);
- transaction.registerSynchronization( sync );
- queuePerTransaction.put(transaction, sync);
- }
- sync.add( work );
- }
- else {
- WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders );
- PostTransactionWorkQueueSynchronization sync = new
PostTransactionWorkQueueSynchronization( workQueue );
- sync.add( work );
- sync.afterCompletion( Status.STATUS_COMMITTED );
- }
+ worker.performWork( work, event.getSession() );
}
public Map<DirectoryProvider, ReentrantLock> getLockableDirectoryProviders() {
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-01-04
10:42:06 UTC (rev 11016)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-01-06
17:12:57 UTC (rev 11017)
@@ -112,7 +112,8 @@
private PostTransactionWorkQueueSynchronization createWorkQueueSync(
Map<Class, DocumentBuilder<Object>> documentBuilders,
Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
- WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders );
+ //FIXME should be harmonized with the WorkerFactory?
+ WorkQueue workQueue = new BatchLuceneWorkQueue( documentBuilders,
lockableDirectoryProviders, true );
return new PostTransactionWorkQueueSynchronization( workQueue );
}