Author: epbernard
Date: 2007-02-22 22:45:39 -0500 (Thu, 22 Feb 2007)
New Revision: 11233
Added:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
Removed:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
Log:
HSEARCH-16
HSEARCH-9
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/Environment.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -16,5 +16,7 @@
public static final String ANALYZER_CLASS = "hibernate.search.analyzer";
public static final String WORKER_PREFIX = "hibernate.search.worker.";
- public static final String WORKER_IMPL = WORKER_PREFIX + "impl";
+ public static final String WORKER_SCOPE = WORKER_PREFIX + "scope";
+ public static final String WORKER_BACKEND = WORKER_PREFIX + "backend";
+ public static final String WORKER_PROCESS = WORKER_PREFIX + "process";
}
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/SearchFactory.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -18,6 +18,7 @@
import org.hibernate.search.annotations.Indexed;
import org.hibernate.search.backend.Worker;
import org.hibernate.search.backend.WorkerFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.store.DirectoryProvider;
import org.hibernate.search.store.DirectoryProviderFactory;
@@ -38,7 +39,17 @@
private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders =
new HashMap<DirectoryProvider, ReentrantLock>();
private Worker worker;
+ private BackendQueueProcessorFactory backendQueueProcessorFactory;
+
+ public BackendQueueProcessorFactory getBackendQueueProcessorFactory() {
+ return backendQueueProcessorFactory;
+ }
+
+ public void setBackendQueueProcessorFactory(BackendQueueProcessorFactory
backendQueueProcessorFactory) {
+ this.backendQueueProcessorFactory = backendQueueProcessorFactory;
+ }
+
public SearchFactory(Configuration cfg) {
//yuk
ReflectionManager reflectionManager = getReflectionManager( cfg );
@@ -100,6 +111,7 @@
WorkerFactory workerFactory = new WorkerFactory();
workerFactory.configure( cfg, this );
worker = workerFactory.createWorker();
+
}
//code doesn't have to be multithreaded because SF creation is not.
Added:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/BackendQueueProcessorFactory.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Properties;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+
+/**
+ * Build stateful backend processor
+ * Must have a no arg constructor
+ * The factory typically prepare or pool the resources needed by the queue processor
+ *
+ * @author Emmanuel Bernard
+ */
+public interface BackendQueueProcessorFactory {
+ void initialize(Properties props, SearchFactory searchFactory);
+
+ /**
+ * Return a runnable implementation responsible for processing the queue to a given
backend
+ */
+
+ Runnable getProcessor(List<Work> queue);
+}
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,22 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend;
-
-import java.util.Properties;
-import java.util.List;
-
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchFactory;
-
-/**
- * Execute the work for a given queue
- *
- * @author Emmanuel Bernard
- */
-public interface QueueWorker extends Runnable {
- void run();
-
- void initialize(Properties props, SearchFactory searchFactory);
-
- void setQueue(List<Work> queue);
-
-}
Copied:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java
(from rev 11171,
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java)
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/QueueingProcessor.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,32 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.List;
+
+import org.hibernate.search.backend.Work;
+
+/**
+ * Pile work operations
+ * No thread safety has to be implemented, the queue being scoped already
+ * The implementation must be "stateless" wrt the queue through (ie not store
the queue state)
+ *
+ * @author Emmanuel Bernard
+ */
+public interface QueueingProcessor {
+ /**
+ * Add a work
+ */
+ void add(Work work, List<Work> queue);
+
+ /**
+ * Execute works
+ * @param queue
+ */
+ void performWork(List<Work> queue);
+
+ /**
+ * Rollback works
+ * @param queue
+ */
+ void cancelWork(List<Work> queue);
+}
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkQueue.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,26 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend;
-
-import org.hibernate.search.backend.Work;
-
-/**
- * Set of work operations
- *
- * @author Emmanuel Bernard
- */
-public interface WorkQueue {
- /**
- * Add a work
- */
- void add(Work work);
-
- /**
- * Execute works
- */
- void performWork();
-
- /**
- * Rollback works
- */
- void cancelWork();
-}
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/WorkerFactory.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -36,7 +36,8 @@
for ( Map.Entry entry : props.entrySet() ) {
String key = (String) entry.getKey();
if ( key.startsWith( Environment.WORKER_PREFIX ) ) {
- workerProperties.setProperty( key.substring( Environment.WORKER_PREFIX.length() ),
(String) entry.getValue() );
+ //key.substring( Environment.WORKER_PREFIX.length() )
+ workerProperties.setProperty( key, (String) entry.getValue() );
}
}
return workerProperties;
@@ -44,7 +45,7 @@
public Worker createWorker() {
Properties props = getProperties( cfg );
- String impl = props.getProperty( Environment.WORKER_IMPL );
+ String impl = props.getProperty( Environment.WORKER_SCOPE );
Worker worker;
if ( StringHelper.isEmpty( impl ) ) {
worker = new TransactionalWorker();
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,71 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.SearchFactory;
-
-/**
- * @author Emmanuel Bernard
-*/
-public class BatchedQueueWorker implements QueueWorker {
- private List<Work> queue;
- private SearchFactory searchFactory;
-
- public void run() {
- Workspace workspace;
- LuceneWorker worker;
- workspace = new Workspace( searchFactory );
- worker = new LuceneWorker( workspace );
- try {
- deadlockFreeQueue(queue, workspace);
- for ( Work work : queue ) {
- worker.performWork( work );
- }
- }
- finally {
- workspace.clean();
- queue.clear();
- }
- }
-
- public void initialize(Properties props, SearchFactory searchFactory) {
- this.searchFactory = searchFactory;
- }
-
- public void setQueue(List<Work> queue) {
- this.queue = queue;
- }
-
- /**
- * one must lock the directory providers in the exact same order to avoid
- * dead lock between concurrent threads or processes
- * To achieve that, the work will be done per directory provider
- */
- private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
- Collections.sort( queue, new Comparator<Work>() {
- public int compare(Work o1, Work o2) {
- long h1 = getWorkHashCode( o1, workspace );
- long h2 = getWorkHashCode( o2, workspace );
- return h1 < h2 ?
- -1 :
- h1 == h2 ?
- 0 :
- 1;
- }
- } );
- }
-
- private long getWorkHashCode(Work work, Workspace workspace) {
- long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
- if (work instanceof AddWork ) h+=1; //addwork after deleteWork
- return h;
- }
-}
Copied:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
(from rev 11171,
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java)
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueingProcessor.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,113 @@
+//$Id: $
+package org.hibernate.search.backend.impl;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.hibernate.annotations.common.util.StringHelper;
+import org.hibernate.search.Environment;
+import org.hibernate.search.SearchException;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.DeleteWork;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.UpdateWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.impl.jms.JMSBackendQueueProcessorFactory;
+import org.hibernate.search.backend.impl.lucene.LuceneBackendQueueProcessorFactory;
+import org.hibernate.util.ReflectHelper;
+
+/**
+ * Batch work until #performWork is called.
+ * The work is then executed synchronously or asynchronously
+ *
+ * @author Emmanuel Bernard
+ */
+public class BatchedQueueingProcessor implements QueueingProcessor {
+ private boolean sync;
+ private ExecutorService executorService;
+ private BackendQueueProcessorFactory backendQueueProcessorFactory;
+
+ public BatchedQueueingProcessor(SearchFactory searchFactory,
+ Properties properties) {
+ //default to sync if none defined
+ this.sync = !"async".equalsIgnoreCase( properties.getProperty(
Environment.WORKER_PROCESS ) );
+
+ int min = Integer.parseInt(
+ properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min",
"0" )
+ );
+ int max = Integer.parseInt(
+ properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max",
"0" ).trim()
+ );
+ if ( max == 0 ) max = Integer.MAX_VALUE;
+ if ( !sync ) {
+ executorService =
+ new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>() );
+ }
+ String backend = properties.getProperty( Environment.WORKER_BACKEND );
+ if ( StringHelper.isEmpty( backend ) || "lucene".equalsIgnoreCase( backend )
) {
+ backendQueueProcessorFactory = new LuceneBackendQueueProcessorFactory();
+ }
+ else if ( "jms".equalsIgnoreCase( backend ) ) {
+ backendQueueProcessorFactory = new JMSBackendQueueProcessorFactory();
+ }
+ else {
+ try {
+ Class processorFactoryClass = ReflectHelper.classForName( backend,
BatchedQueueingProcessor.class );
+ backendQueueProcessorFactory = (BackendQueueProcessorFactory)
processorFactoryClass.newInstance();
+ }
+ catch (ClassNotFoundException e) {
+ throw new SearchException( "Unable to find processor class: " + backend, e
);
+ }
+ catch (IllegalAccessException e) {
+ throw new SearchException( "Unable to instanciate processor class: " +
backend, e );
+ }
+ catch (InstantiationException e) {
+ throw new SearchException( "Unable to instanciate processor class: " +
backend, e );
+ }
+ }
+ backendQueueProcessorFactory.initialize( properties, searchFactory );
+ searchFactory.setBackendQueueProcessorFactory( backendQueueProcessorFactory );
+ }
+
+ public void add(Work work, List<Work> queue) {
+ //TODO optimize by getting rid of dupe works
+ if ( work instanceof UpdateWork ) {
+ //split in 2 to optimize the process (reader first, writer next
+ queue.add( new DeleteWork( work.getId(), work.getEntity() ) );
+ queue.add( new AddWork( work.getId(), work.getEntity(), work.getDocument() ) );
+ }
+ else {
+ queue.add( work );
+ }
+ }
+
+ //TODO implements parallel batchWorkers (one per Directory)
+ public void performWork(List<Work> queue) {
+ Runnable processor = backendQueueProcessorFactory.getProcessor( queue );
+ if ( sync ) {
+ processor.run();
+ }
+ else {
+ executorService.execute( processor );
+ }
+ }
+
+ public void cancelWork(List<Work> queue) {
+ queue.clear();
+ }
+
+ @Override
+ public void finalize() throws Throwable {
+ super.finalize();
+ //gracefully stop
+ //TODO move to the SF close lifecycle
+ if ( executorService != null && !executorService.isShutdown() )
executorService.shutdown();
+ }
+
+}
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,80 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ExecutorService;
-
-import org.hibernate.search.SearchFactory;
-import org.hibernate.search.backend.WorkQueue;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.UpdateWork;
-import org.hibernate.search.backend.DeleteWork;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.Environment;
-
-/**
- * Batch work until #performWork is called.
- * The work is then executed synchronously or asynchronously
- *
- * @author Emmanuel Bernard
- */
-public class BatchedWorkQueue implements WorkQueue {
- private List<Work> queue = new ArrayList<Work>();
- private boolean sync;
- private ExecutorService executorService;
- private Properties properties;
- private SearchFactory searchFactory;
-
- public BatchedWorkQueue(SearchFactory searchFactory,
- Properties properties) {
- this.searchFactory = searchFactory;
- this.properties = properties;
- //default to sync if none defined
- this.sync = ! "async".equalsIgnoreCase( properties.getProperty(
Environment.WORKER_PREFIX + "type") );
- int min = Integer.parseInt(
- properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min",
"0" )
- );
- int max = Integer.parseInt(
- properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max",
"0" ).trim()
- );
- if (max == 0) max = Integer.MAX_VALUE;
- if ( ! sync) {
- executorService = new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>() );
- }
- }
-
- public void add(Work work) {
- //TODO optimize by getting rid of dupe works
- if ( work instanceof UpdateWork ) {
- //split in 2 to optimize the process (reader first, writer next
- queue.add( new DeleteWork( work.getId(), work.getEntity() ) );
- queue.add( new AddWork( work.getId(), work.getEntity(), work.getDocument() ) );
- }
- else {
- queue.add( work );
- }
- }
-
- //TODO implements parallel batchWorkers (one per Directory)
- public void performWork() {
- BatchedQueueWorker batchWorker = new BatchedQueueWorker();
- batchWorker.initialize( properties, searchFactory );
- batchWorker.setQueue( queue );
- if (sync) {
- batchWorker.run();
- }
- else {
- executorService.execute( batchWorker );
- }
- }
-
- public void cancelWork() {
- queue.clear();
- }
-
-}
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,116 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.index.TermDocs;
-import org.hibernate.annotations.common.AssertionFailure;
-import org.hibernate.search.engine.DocumentBuilder;
-import org.hibernate.search.backend.Workspace;
-import org.hibernate.search.backend.DeleteWork;
-import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.backend.UpdateWork;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchException;
-
-/**
- * Stateless implementation that perform a work
- *
- * @author Emmanuel Bernard
- */
-public class LuceneWorker {
- private Workspace workspace;
- private static Log log = LogFactory.getLog( LuceneWorker.class );
-
- public LuceneWorker(Workspace workspace) {
- this.workspace = workspace;
- }
-
- public void performWork(Work work) {
- if ( AddWork.class.isAssignableFrom( work.getClass() ) ) {
- performWork( (AddWork) work );
- }
- else if ( UpdateWork.class.isAssignableFrom( work.getClass() ) ) {
- performWork( (UpdateWork) work );
- }
- else if ( DeleteWork.class.isAssignableFrom( work.getClass() ) ) {
- performWork( (DeleteWork) work );
- }
- else {
- throw new AssertionFailure( "Unknown work type: " + work.getClass() );
- }
- }
-
- public void performWork(AddWork work) {
- Class entity = work.getEntity();
- Serializable id = work.getId();
- Document document = work.getDocument();
- add( entity, id, document );
- }
-
- private void add(Class entity, Serializable id, Document document) {
- if ( log.isTraceEnabled() )
- log.trace( "add to Lucene index: " + entity + "#" + id + ":
" + document );
- IndexWriter writer = workspace.getIndexWriter( entity );
- try {
- writer.addDocument( document );
- }
- catch (IOException e) {
- throw new SearchException( "Unable to add to Lucene index: " + entity +
"#" + id, e );
- }
- }
-
- public void performWork(UpdateWork work) {
- Class entity = work.getEntity();
- Serializable id = work.getId();
- Document document = work.getDocument();
- remove( entity, id );
- add( entity, id, document );
- }
-
- public void performWork(DeleteWork work) {
- Class entity = work.getEntity();
- Serializable id = work.getId();
- remove( entity, id );
- }
-
- private void remove(Class entity, Serializable id) {
- log.trace( "remove from Lucene index: " + entity + "#" + id );
- DocumentBuilder builder = workspace.getDocumentBuilder( entity );
- Term term = builder.getTerm( id );
- IndexReader reader = workspace.getIndexReader( entity );
- TermDocs termDocs = null;
- try {
- //TODO is there a faster way?
- //TODO include TermDocs into the workspace?
- termDocs = reader.termDocs( term );
- String entityName = entity.getName();
- while ( termDocs.next() ) {
- int docIndex = termDocs.doc();
- if ( entityName.equals( reader.document( docIndex ).get(
DocumentBuilder.CLASS_FIELDNAME ) ) ) {
- //remove only the one of the right class
- //loop all to remove all the matches (defensive code)
- reader.deleteDocument( docIndex );
- }
- }
- }
- catch (Exception e) {
- throw new SearchException( "Unable to remove from Lucene index: " + entity +
"#" + id, e );
- }
- finally {
- if (termDocs != null) try {
- termDocs.close();
- }
- catch (IOException e) {
- log.warn( "Unable to close termDocs properly", e);
- }
- }
- }
-}
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/PostTransactionWorkQueueSynchronization.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,10 +1,12 @@
//$Id: $
package org.hibernate.search.backend.impl;
+import java.util.List;
+import java.util.ArrayList;
import javax.transaction.Status;
import javax.transaction.Synchronization;
-import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
import org.hibernate.search.backend.Work;
import org.hibernate.search.util.WeakIdentityHashMap;
@@ -14,27 +16,21 @@
* @author Emmanuel Bernard
*/
public class PostTransactionWorkQueueSynchronization implements Synchronization {
- private WorkQueue workQueue;
+ private QueueingProcessor queueingProcessor;
private boolean consumed;
private WeakIdentityHashMap queuePerTransaction;
+ private List<Work> queue = new ArrayList<Work>();
/**
- * out of transaction work
- */
- public PostTransactionWorkQueueSynchronization(WorkQueue workQueue) {
- this.workQueue = workQueue;
- }
-
- /**
* in transaction work
*/
- public PostTransactionWorkQueueSynchronization(WorkQueue workQueue, WeakIdentityHashMap
queuePerTransaction) {
- this(workQueue);
+ public PostTransactionWorkQueueSynchronization(QueueingProcessor queueingProcessor,
WeakIdentityHashMap queuePerTransaction) {
+ this.queueingProcessor = queueingProcessor;
this.queuePerTransaction = queuePerTransaction;
}
public void add(Work work) {
- workQueue.add( work );
+ queueingProcessor.add( work, queue );
}
public boolean isConsumed() {
@@ -47,10 +43,10 @@
public void afterCompletion(int i) {
try {
if ( Status.STATUS_COMMITTED == i ) {
- workQueue.performWork();
+ queueingProcessor.performWork(queue);
}
else {
- workQueue.cancelWork();
+ queueingProcessor.cancelWork(queue);
}
}
finally {
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -2,17 +2,15 @@
package org.hibernate.search.backend.impl;
import java.util.Properties;
-import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.transaction.Status;
+import java.util.List;
+import java.util.ArrayList;
import org.hibernate.search.backend.Worker;
import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.WorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
import org.hibernate.search.util.WeakIdentityHashMap;
-import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.SearchFactory;
-import org.hibernate.search.store.DirectoryProvider;
import org.hibernate.event.EventSource;
import org.hibernate.Transaction;
@@ -27,35 +25,29 @@
*/
public class TransactionalWorker implements Worker {
//not a synchronized map since for a given transaction, we have not concurrent access
- protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
- private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
- private Map<Class, DocumentBuilder<Object>> documentBuilders;
- private Properties properties;
- private SearchFactory searchFactory;
+ protected WeakIdentityHashMap synchronizationPerTransaction = new
WeakIdentityHashMap();
+ private QueueingProcessor queueingProcessor;
public void performWork(Work work, EventSource session) {
if ( session.isTransactionInProgress() ) {
Transaction transaction = session.getTransaction();
PostTransactionWorkQueueSynchronization txSync =
(PostTransactionWorkQueueSynchronization)
- queuePerTransaction.get( transaction );
+ synchronizationPerTransaction.get( transaction );
if ( txSync == null || txSync.isConsumed() ) {
- WorkQueue workQueue = new BatchedWorkQueue( searchFactory, properties );
- txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction
);
+ txSync = new PostTransactionWorkQueueSynchronization( queueingProcessor,
synchronizationPerTransaction );
transaction.registerSynchronization( txSync );
- queuePerTransaction.put(transaction, txSync);
+ synchronizationPerTransaction.put(transaction, txSync);
}
txSync.add( work );
}
else {
- WorkQueue workQueue = new BatchedWorkQueue( searchFactory, properties );
- PostTransactionWorkQueueSynchronization sync = new
PostTransactionWorkQueueSynchronization( workQueue );
- sync.add( work );
- sync.afterCompletion( Status.STATUS_COMMITTED );
+ List<Work> queue = new ArrayList<Work>(2); //one work can be split
+ queueingProcessor.add( work, queue );
+ queueingProcessor.performWork( queue );
}
}
public void initialize(Properties props, SearchFactory searchFactory) {
- this.searchFactory = searchFactory;
- this.properties = props;
+ this.queueingProcessor = new BatchedQueueingProcessor( searchFactory, props );
}
}
Copied:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java
(from rev 11171,
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java)
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessor.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,51 @@
+//$Id: $
+package org.hibernate.search.backend.impl.jms;
+
+import java.io.Serializable;
+import java.util.List;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+
+import org.hibernate.HibernateException;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class JMSBackendQueueProcessor implements Runnable {
+ private List<Work> queue;
+ private JMSBackendQueueProcessorFactory factory;
+
+ public JMSBackendQueueProcessor(List<Work> queue,
+ JMSBackendQueueProcessorFactory jmsBackendQueueProcessorFactory) {
+ this.queue = queue;
+ this.factory = jmsBackendQueueProcessorFactory;
+ }
+
+ public void run() {
+ factory.prepareJMSTools();
+ QueueConnection cnn;
+ QueueSender sender;
+ QueueSession session;
+ try {
+ cnn = factory.getJMSFactory().createQueueConnection();
+ //TODO make transacted parameterized
+ session = cnn.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
+
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject( (Serializable) this.queue );
+
+ sender = session.createSender( factory.getJmsQueue() );
+ sender.send( message );
+
+ session.close();
+ cnn.close();
+ }
+ catch (JMSException e) {
+ throw new HibernateException( "Unable to send Search work to JMS queue: " +
factory.getJmsQueueName(), e );
+ }
+ }
+}
Added:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSBackendQueueProcessorFactory.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,107 @@
+//$Id: $
+package org.hibernate.search.backend.impl.jms;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import javax.jms.Queue;
+import javax.jms.QueueConnectionFactory;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.hibernate.HibernateException;
+import org.hibernate.search.Environment;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class JMSBackendQueueProcessorFactory implements BackendQueueProcessorFactory {
+ private String jmsQueueName;
+ private String jmsConnectionFactoryName;
+ private static final String JNDI_PREFIX = Environment.WORKER_PREFIX +
"jndi.";
+ private Properties properties;
+ private Queue jmsQueue;
+ private QueueConnectionFactory factory;
+
+ public void initialize(Properties props, SearchFactory searchFactory) {
+ this.properties = props;
+ this.jmsConnectionFactoryName = props.getProperty( Environment.WORKER_PREFIX +
"jms.connection_factory" );
+ this.jmsQueueName = props.getProperty( Environment.WORKER_PREFIX +
"jms.queue" );
+ prepareJMSTools();
+ }
+
+ public Runnable getProcessor(List<Work> queue) {
+ return new JMSBackendQueueProcessor(queue, this);
+ }
+
+
+ public QueueConnectionFactory getJMSFactory() {
+ return factory;
+ }
+
+ public Queue getJmsQueue() {
+ return jmsQueue;
+ }
+
+
+ public String getJmsQueueName() {
+ return jmsQueueName;
+ }
+
+ public void prepareJMSTools() {
+ if (jmsQueue != null && factory != null) return;
+ try {
+ InitialContext initialContext = getInitialContext(properties);
+ jmsQueue = (Queue) initialContext.lookup( jmsQueueName );
+ factory = (QueueConnectionFactory) initialContext.lookup( jmsConnectionFactoryName );
+ }
+ catch (NamingException e) {
+ throw new HibernateException("Unable to lookup Search queue and connection
factory", e);
+ }
+ }
+
+ private InitialContext getInitialContext(Properties properties) throws NamingException
{
+ Properties jndiProps = getJndiProperties( properties );
+ if (jndiProps.size() == 0) {
+ return new InitialContext( );
+ }
+ else {
+ return new InitialContext( jndiProps );
+ }
+ }
+
+ public static Properties getJndiProperties(Properties properties) {
+
+ HashSet specialProps = new HashSet();
+ specialProps.add( JNDI_PREFIX + "class" );
+ specialProps.add( JNDI_PREFIX + "url" );
+
+ Iterator iter = properties.keySet().iterator();
+ Properties result = new Properties();
+ while ( iter.hasNext() ) {
+ String prop = (String) iter.next();
+ if ( prop.indexOf(JNDI_PREFIX) > -1 && !specialProps.contains(prop) ) {
+ result.setProperty(
+ prop.substring( JNDI_PREFIX.length()+1 ),
+ properties.getProperty(prop)
+ );
+ }
+ }
+
+ String jndiClass = properties.getProperty(JNDI_PREFIX + "class");
+ String jndiURL = properties.getProperty(JNDI_PREFIX + "url");
+ // we want to be able to just use the defaults,
+ // if JNDI environment properties are not supplied
+ // so don't put null in anywhere
+ if (jndiClass != null) result.put( Context.INITIAL_CONTEXT_FACTORY, jndiClass);
+ if (jndiURL != null) result.put(Context.PROVIDER_URL, jndiURL);
+
+ return result;
+ }
+
+}
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSHibernateSearchController.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -2,23 +2,20 @@
package org.hibernate.search.backend.impl.jms;
import java.util.List;
-import java.util.Properties;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.JMSException;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.backend.impl.BatchedQueueWorker;
-import org.hibernate.search.util.ContextHelper;
-import org.hibernate.search.SearchFactory;
+import org.apache.commons.logging.LogFactory;
import org.hibernate.Session;
import org.hibernate.engine.SessionImplementor;
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.util.ContextHelper;
/**
* @author Emmanuel Bernard
@@ -29,7 +26,7 @@
protected abstract Session getSession();
public void onMessage(Message message) {
- if (! (message instanceof ObjectMessage) ) {
+ if ( !( message instanceof ObjectMessage ) ) {
log.error( "Incorrect message type: " + message.getClass() );
return;
}
@@ -42,30 +39,29 @@
log.error( "Unable to retrieve object from message: " + message.getClass(),
e );
return;
}
- catch( ClassCastException e ) {
+ catch (ClassCastException e) {
log.error( "Illegal object retrieved from message", e );
return;
}
- QueueWorker worker = getWorker( queue );
+ Runnable worker = getWorker( queue );
worker.run();
}
- private QueueWorker getWorker(List<Work> queue) {
+ private Runnable getWorker(List<Work> queue) {
//FIXME casting sucks becasue we do not control what get session from
SearchFactory factory = ContextHelper.getSearchFactory( (SessionImplementor)
getSession() );
- QueueWorker worker = new BatchedQueueWorker();
- worker.initialize( new Properties(), factory );
- worker.setQueue( queue );
- return worker;
+ return factory.getBackendQueueProcessorFactory().getProcessor( queue );
}
@PostConstruct
public void initialize() {
//init the source copy process
-
+ //TODO actually this is probably wrong since this is now part of the DP
}
+
@PreDestroy
public void shutdown() {
//stop the source copy process
+ //TODO actually this is probably wrong since this is now part of the DP
}
}
Deleted:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -1,125 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend.impl.jms;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.io.Serializable;
-
-import javax.jms.QueueSender;
-import javax.jms.QueueConnection;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.ObjectMessage;
-import javax.jms.JMSException;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import javax.naming.Context;
-
-import org.hibernate.search.backend.QueueWorker;
-import org.hibernate.search.backend.Work;
-import org.hibernate.search.SearchFactory;
-import org.hibernate.search.Environment;
-import org.hibernate.HibernateException;
-
-/**
- * @author Emmanuel Bernard
- */
-public class JMSQueueWorker implements QueueWorker {
- private List<Work> queue;
- private String jmsQueueName;
- private String jmsConnectionFactoryName;
- private static final String JNDI_PREFIX = Environment.WORKER_PREFIX +
"jndi.";
- private Properties properties;
- private Queue jmsQueue;
- private QueueConnectionFactory factory;
-
- public void run() {
- resetJMSTools();
- QueueConnection cnn;
- QueueSender sender;
- QueueSession session;
- try {
- cnn = factory.createQueueConnection();
- //TODO make transacted parameterized
- session = cnn.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
-
- ObjectMessage message = session.createObjectMessage( );
- message.setObject( (Serializable) this.queue );
-
- sender = session.createSender( jmsQueue );
- sender.send( message );
-
- session.close();
- cnn.close();
- }
- catch( JMSException e ) {
- throw new HibernateException("Unable to send Search work to JMS queue: " +
jmsQueueName, e);
- }
- }
-
- private InitialContext getInitialContext(Properties properties) throws NamingException
{
- Properties jndiProps = getJndiProperties( properties );
- if (jndiProps.size() == 0) {
- return new InitialContext( );
- }
- else {
- return new InitialContext( jndiProps );
- }
- }
-
- public void initialize(Properties props, SearchFactory searchFactory) {
- this.properties = props;
- this.jmsConnectionFactoryName = props.getProperty( Environment.WORKER_PREFIX +
"jms.connection_factory" );
- this.jmsQueueName = props.getProperty( Environment.WORKER_PREFIX +
"jms.queue" );
- resetJMSTools();
-
- }
-
- private void resetJMSTools() {
- if (jmsQueue != null && factory != null) return;
- try {
- InitialContext initialContext = getInitialContext(properties);
- jmsQueue = (Queue) initialContext.lookup( jmsQueueName );
- factory = (QueueConnectionFactory) initialContext.lookup( jmsConnectionFactoryName );
- }
- catch (NamingException e) {
- throw new HibernateException("Unable to lookup Search queue and connection
factory", e);
- }
- }
-
- public void setQueue(List<Work> queue) {
- this.queue = queue;
- }
-
- public static Properties getJndiProperties(Properties properties) {
-
- HashSet specialProps = new HashSet();
- specialProps.add( JNDI_PREFIX + "class" );
- specialProps.add( JNDI_PREFIX + "url" );
-
- Iterator iter = properties.keySet().iterator();
- Properties result = new Properties();
- while ( iter.hasNext() ) {
- String prop = (String) iter.next();
- if ( prop.indexOf(JNDI_PREFIX) > -1 && !specialProps.contains(prop) ) {
- result.setProperty(
- prop.substring( JNDI_PREFIX.length()+1 ),
- properties.getProperty(prop)
- );
- }
- }
-
- String jndiClass = properties.getProperty(JNDI_PREFIX + "class");
- String jndiURL = properties.getProperty(JNDI_PREFIX + "url");
- // we want to be able to just use the defaults,
- // if JNDI environment properties are not supplied
- // so don't put null in anywhere
- if (jndiClass != null) result.put( Context.INITIAL_CONTEXT_FACTORY, jndiClass);
- if (jndiURL != null) result.put(Context.PROVIDER_URL, jndiURL);
-
- return result;
- }
-}
Copied:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
(from rev 11171,
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java)
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessor.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,69 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.Workspace;
+
+/**
+ * Apply the operations to Lucene directories
+ * avoiding deadlocks
+ *
+ * @author Emmanuel Bernard
+ */
+public class LuceneBackendQueueProcessor implements Runnable {
+ private List<Work> queue;
+ private SearchFactory searchFactory;
+
+ public LuceneBackendQueueProcessor(List<Work> queue, SearchFactory searchFactory)
{
+ this.queue = queue;
+ this.searchFactory = searchFactory;
+ }
+
+ public void run() {
+ Workspace workspace;
+ LuceneWorker worker;
+ workspace = new Workspace( searchFactory );
+ worker = new LuceneWorker( workspace );
+ try {
+ deadlockFreeQueue(queue, workspace);
+ for ( Work work : queue ) {
+ worker.performWork( work );
+ }
+ }
+ finally {
+ workspace.clean();
+ queue.clear();
+ }
+ }
+
+ /**
+ * one must lock the directory providers in the exact same order to avoid
+ * dead lock between concurrent threads or processes
+ * To achieve that, the work will be done per directory provider
+ */
+ private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
+ Collections.sort( queue, new Comparator<Work>() {
+ public int compare(Work o1, Work o2) {
+ long h1 = getWorkHashCode( o1, workspace );
+ long h2 = getWorkHashCode( o2, workspace );
+ return h1 < h2 ?
+ -1 :
+ h1 == h2 ?
+ 0 :
+ 1;
+ }
+ } );
+ }
+
+ private long getWorkHashCode(Work work, Workspace workspace) {
+ long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
+ if (work instanceof AddWork ) h+=1; //addwork after deleteWork
+ return h;
+ }
+}
Added:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneBackendQueueProcessorFactory.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,24 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.util.Properties;
+import java.util.List;
+
+import org.hibernate.search.SearchFactory;
+import org.hibernate.search.backend.BackendQueueProcessorFactory;
+import org.hibernate.search.backend.Work;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class LuceneBackendQueueProcessorFactory implements BackendQueueProcessorFactory
{
+ private SearchFactory searchFactory;
+
+ public void initialize(Properties props, SearchFactory searchFactory) {
+ this.searchFactory = searchFactory;
+ }
+
+ public Runnable getProcessor(List<Work> queue) {
+ return new LuceneBackendQueueProcessor( queue, searchFactory );
+ }
+}
Copied:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
(from rev 11171,
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/LuceneWorker.java)
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java
(rev 0)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/backend/impl/lucene/LuceneWorker.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -0,0 +1,116 @@
+//$Id: $
+package org.hibernate.search.backend.impl.lucene;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.index.TermDocs;
+import org.hibernate.annotations.common.AssertionFailure;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.DeleteWork;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.UpdateWork;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.SearchException;
+
+/**
+ * Stateless implementation that perform a work
+ *
+ * @author Emmanuel Bernard
+ */
+public class LuceneWorker {
+ private Workspace workspace;
+ private static Log log = LogFactory.getLog( LuceneWorker.class );
+
+ public LuceneWorker(Workspace workspace) {
+ this.workspace = workspace;
+ }
+
+ public void performWork(Work work) {
+ if ( AddWork.class.isAssignableFrom( work.getClass() ) ) {
+ performWork( (AddWork) work );
+ }
+ else if ( UpdateWork.class.isAssignableFrom( work.getClass() ) ) {
+ performWork( (UpdateWork) work );
+ }
+ else if ( DeleteWork.class.isAssignableFrom( work.getClass() ) ) {
+ performWork( (DeleteWork) work );
+ }
+ else {
+ throw new AssertionFailure( "Unknown work type: " + work.getClass() );
+ }
+ }
+
+ public void performWork(AddWork work) {
+ Class entity = work.getEntity();
+ Serializable id = work.getId();
+ Document document = work.getDocument();
+ add( entity, id, document );
+ }
+
+ private void add(Class entity, Serializable id, Document document) {
+ if ( log.isTraceEnabled() )
+ log.trace( "add to Lucene index: " + entity + "#" + id + ":
" + document );
+ IndexWriter writer = workspace.getIndexWriter( entity );
+ try {
+ writer.addDocument( document );
+ }
+ catch (IOException e) {
+ throw new SearchException( "Unable to add to Lucene index: " + entity +
"#" + id, e );
+ }
+ }
+
+ public void performWork(UpdateWork work) {
+ Class entity = work.getEntity();
+ Serializable id = work.getId();
+ Document document = work.getDocument();
+ remove( entity, id );
+ add( entity, id, document );
+ }
+
+ public void performWork(DeleteWork work) {
+ Class entity = work.getEntity();
+ Serializable id = work.getId();
+ remove( entity, id );
+ }
+
+ private void remove(Class entity, Serializable id) {
+ log.trace( "remove from Lucene index: " + entity + "#" + id );
+ DocumentBuilder builder = workspace.getDocumentBuilder( entity );
+ Term term = builder.getTerm( id );
+ IndexReader reader = workspace.getIndexReader( entity );
+ TermDocs termDocs = null;
+ try {
+ //TODO is there a faster way?
+ //TODO include TermDocs into the workspace?
+ termDocs = reader.termDocs( term );
+ String entityName = entity.getName();
+ while ( termDocs.next() ) {
+ int docIndex = termDocs.doc();
+ if ( entityName.equals( reader.document( docIndex ).get(
DocumentBuilder.CLASS_FIELDNAME ) ) ) {
+ //remove only the one of the right class
+ //loop all to remove all the matches (defensive code)
+ reader.deleteDocument( docIndex );
+ }
+ }
+ }
+ catch (Exception e) {
+ throw new SearchException( "Unable to remove from Lucene index: " + entity +
"#" + id, e );
+ }
+ finally {
+ if (termDocs != null) try {
+ termDocs.close();
+ }
+ catch (IOException e) {
+ log.warn( "Unable to close termDocs properly", e);
+ }
+ }
+ }
+}
Modified:
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -48,8 +48,8 @@
import org.hibernate.search.SearchFactory;
import org.hibernate.search.backend.UpdateWork;
import org.hibernate.search.backend.Work;
-import org.hibernate.search.backend.WorkQueue;
-import org.hibernate.search.backend.impl.BatchedWorkQueue;
+import org.hibernate.search.backend.QueueingProcessor;
+import org.hibernate.search.backend.impl.BatchedQueueingProcessor;
import org.hibernate.search.backend.impl.PostTransactionWorkQueueSynchronization;
import org.hibernate.search.FullTextSession;
import org.hibernate.stat.SessionStatistics;
@@ -95,7 +95,7 @@
Serializable id = session.getIdentifier( entity );
Document doc = builder.getDocument( entity, id );
UpdateWork work = new UpdateWork( id, entity.getClass(), doc );
- processWork( work, searchFactory );
+ searchFactory.getWorker().performWork( work, session );
}
//TODO
//need to add elements in a queue kept at the Session level
@@ -106,30 +106,6 @@
// this is an open discussion
}
- private void processWork(Work work, SearchFactory searchFactory) {
- if ( session.isTransactionInProgress() ) {
- if ( postTransactionWorkQueueSynch == null ||
postTransactionWorkQueueSynch.isConsumed() ) {
- postTransactionWorkQueueSynch = createWorkQueueSync(searchFactory);
- session.getTransaction().registerSynchronization( postTransactionWorkQueueSynch );
- }
- postTransactionWorkQueueSynch.add( work );
- }
- else {
- //no transaction work right away
- PostTransactionWorkQueueSynchronization sync =
- createWorkQueueSync( searchFactory );
- sync.add( work );
- sync.afterCompletion( Status.STATUS_COMMITTED );
- }
- }
-
- private PostTransactionWorkQueueSynchronization createWorkQueueSync(
- SearchFactory searchFactory) {
- //FIXME should be harmonized with the WorkerFactory?
- WorkQueue workQueue = new BatchedWorkQueue( searchFactory, new Properties() );
- return new PostTransactionWorkQueueSynchronization( workQueue );
- }
-
public Query createSQLQuery(String sql, String returnAlias, Class returnClass) {
return session.createSQLQuery( sql, returnAlias, returnClass );
}
Modified:
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/util/FileHelperTest.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -52,7 +52,7 @@
public void testSynchronize() throws Exception {
File src = new File("./filehelpersrc");
- File dest = new File("./filehelpertest");
+ File dest = new File("./filehelperdest");
FileHelper.synchronize( src, dest, true );
File test = new File(dest, "b");
assertTrue( test.exists() );
Modified:
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -18,8 +18,8 @@
protected void configure(Configuration cfg) {
cfg.setProperty( "hibernate.search.default.directory_provider",
RAMDirectoryProvider.class.getName() );
cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
- cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
- cfg.setProperty( Environment.WORKER_PREFIX + "type", "async" );
+ cfg.setProperty( Environment.WORKER_SCOPE, "transaction" );
+ cfg.setProperty( Environment.WORKER_PROCESS, "async" );
cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.min", "1"
);
cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.max",
"10" );
FullTextIndexEventListener del = new FullTextIndexEventListener();
Modified:
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -18,8 +18,8 @@
protected void configure(Configuration cfg) {
cfg.setProperty( "hibernate.search.default.directory_provider",
RAMDirectoryProvider.class.getName() );
cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
- cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
- cfg.setProperty( Environment.WORKER_PREFIX + "type", "sync" );
+ cfg.setProperty( Environment.WORKER_SCOPE, "transaction" );
+ cfg.setProperty( Environment.WORKER_PREFIX, "sync" );
FullTextIndexEventListener del = new FullTextIndexEventListener();
cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del}
);
cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del}
);
Modified:
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
===================================================================
---
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java 2007-02-22
20:07:09 UTC (rev 11232)
+++
branches/Branch_3_2/HibernateExt/search/src/test/org/hibernate/search/test/worker/WorkerTestCase.java 2007-02-23
03:45:39 UTC (rev 11233)
@@ -27,11 +27,11 @@
*/
public class WorkerTestCase extends SearchTestCase {
- protected void setUp() throws Exception {
+ protected void setUp() throws Exception {
File sub = getBaseIndexDir();
sub.mkdir();
File[] files = sub.listFiles();
- for (File file : files) {
+ for ( File file : files ) {
if ( file.isDirectory() ) {
delete( file );
}
@@ -64,22 +64,23 @@
}
}
- public void testConcurrency() throws Exception {
- int nThreads = 15;
- ExecutorService es = Executors.newFixedThreadPool( nThreads );
+ public void testConcurrency() throws Exception {
+ int nThreads = 15;
+ ExecutorService es = Executors.newFixedThreadPool( nThreads );
Work work = new Work( getSessions() );
ReverseWork reverseWork = new ReverseWork( getSessions() );
- long start = System.currentTimeMillis();
- int iteration = 200;
- for (int i = 0 ; i < iteration; i++) {
+ long start = System.currentTimeMillis();
+ int iteration = 100;
+ for ( int i = 0; i < iteration; i++ ) {
es.execute( work );
es.execute( reverseWork );
}
- while(work.count < 199) {
+ while ( work.count < iteration - 1 ) {
Thread.sleep( 20 );
}
- System.out.println( 3*iteration + " iterations (4 tx per iteration) in "
+ nThreads + " threads: " + (System.currentTimeMillis() - start) );
- }
+ System.out.println( iteration + " iterations (8 tx per iteration) in " +
nThreads + " threads: " + ( System
+ .currentTimeMillis() - start ) );
+ }
protected class Work implements Runnable {
private SessionFactory sf;
@@ -90,7 +91,7 @@
}
public void run() {
- Session s = sf.openSession( );
+ Session s = sf.openSession();
Transaction tx = s.beginTransaction();
Employee ee = new Employee();
ee.setName( "Emmanuel" );
@@ -101,7 +102,7 @@
tx.commit();
s.close();
- s = sf.openSession( );
+ s = sf.openSession();
tx = s.beginTransaction();
ee = (Employee) s.get( Employee.class, ee.getId() );
ee.setName( "Emmanuel2" );
@@ -110,25 +111,30 @@
tx.commit();
s.close();
- s = sf.openSession( );
+// try {
+// Thread.sleep( 50 );
+// }
+// catch (InterruptedException e) {
+// e.printStackTrace(); //To change body of catch statement use File | Settings |
File Templates.
+// }
+
+ s = sf.openSession();
tx = s.beginTransaction();
- FullTextSession fts = new FullTextSessionImpl(s);
- QueryParser parser = new QueryParser( "id", new StopAnalyzer() );
- Query query;
- try
- {
- query = parser.parse( "name:emmanuel2" );
- }
- catch (ParseException e)
- {
- throw new RuntimeException(e);
- }
- boolean results = fts.createFullTextQuery( query ).list().size() > 0;
- if (!results) throw new RuntimeException("No results!");
- tx.commit();
+ FullTextSession fts = new FullTextSessionImpl( s );
+ QueryParser parser = new QueryParser( "id", new StopAnalyzer() );
+ Query query;
+ try {
+ query = parser.parse( "name:emmanuel2" );
+ }
+ catch (ParseException e) {
+ throw new RuntimeException( e );
+ }
+ boolean results = fts.createFullTextQuery( query ).list().size() > 0;
+ //if ( !results ) throw new RuntimeException( "No results!" );
+ tx.commit();
s.close();
- s = sf.openSession( );
+ s = sf.openSession();
tx = s.beginTransaction();
ee = (Employee) s.get( Employee.class, ee.getId() );
s.delete( ee );
@@ -148,7 +154,7 @@
}
public void run() {
- Session s = sf.openSession( );
+ Session s = sf.openSession();
Transaction tx = s.beginTransaction();
Employer er = new Employer();
er.setName( "RH" );
@@ -159,7 +165,7 @@
tx.commit();
s.close();
- s = sf.openSession( );
+ s = sf.openSession();
tx = s.beginTransaction();
er = (Employer) s.get( Employer.class, er.getId() );
er.setName( "RH2" );
@@ -168,7 +174,7 @@
tx.commit();
s.close();
- s = sf.openSession( );
+ s = sf.openSession();
tx = s.beginTransaction();
er = (Employer) s.get( Employer.class, er.getId() );
s.delete( er );
@@ -179,7 +185,7 @@
}
}
- protected void configure(org.hibernate.cfg.Configuration cfg) {
+ protected void configure(org.hibernate.cfg.Configuration cfg) {
File sub = getBaseIndexDir();
cfg.setProperty( "hibernate.search.default.indexBase", sub.getAbsolutePath()
);
cfg.setProperty( "hibernate.search.Clock.directory_provider",
FSDirectoryProvider.class.getName() );
@@ -190,8 +196,8 @@
cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del}
);
}
- protected Class[] getMappings() {
- return new Class[] {
+ protected Class[] getMappings() {
+ return new Class[]{
Employee.class,
Employer.class
};