Author: epbernard
Date: 2007-01-08 07:23:31 -0500 (Mon, 08 Jan 2007)
New Revision: 11022
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java
Log:
ANN-520 asynchronous worker queue
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -6,7 +6,13 @@
import java.util.Map;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.store.DirectoryProvider;
@@ -17,6 +23,8 @@
import org.hibernate.search.backend.UpdateWork;
import org.hibernate.search.backend.DeleteWork;
import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.Environment;
/**
* Batch work until #performWork is called.
@@ -27,14 +35,29 @@
public class BatchedWorkQueue implements WorkQueue {
private List<Work> queue = new ArrayList<Work>();
private boolean sync;
- Map<Class, DocumentBuilder<Object>> documentBuilders;
- Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private Map<Class, DocumentBuilder<Object>> documentBuilders;
+ private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private ExecutorService executorService;
+ private Properties properties;
public BatchedWorkQueue(Map<Class, DocumentBuilder<Object>>
documentBuilders,
- Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders, boolean
sync) {
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders,
+ Properties properties) {
this.documentBuilders = documentBuilders;
this.lockableDirectoryProviders = lockableDirectoryProviders;
- this.sync = sync;
+ this.properties = properties;
+ //default to sync if none defined
+ this.sync = ! "async".equalsIgnoreCase( properties.getProperty(
Environment.WORKER_PREFIX + "type") );
+ int min = Integer.parseInt(
+ properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min",
"0" )
+ );
+ int max = Integer.parseInt(
+ properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max",
"0" ).trim()
+ );
+ if (max == 0) max = Integer.MAX_VALUE;
+ if ( ! sync) {
+ executorService = new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>() );
+ }
}
public void add(Work work) {
@@ -51,14 +74,12 @@
//TODO implements parallel batchWorkers (one per Directory)
public void performWork() {
- BatchWorker batchWorker = new BatchWorker( queue, documentBuilders,
lockableDirectoryProviders );
+ BatchWorker batchWorker = new BatchWorker( queue, documentBuilders,
lockableDirectoryProviders, properties );
if (sync) {
batchWorker.run();
}
else {
- //TODO pool threads?
- Thread thread = new Thread(batchWorker);
- thread.start();
+ executorService.execute( batchWorker );
}
}
@@ -73,7 +94,7 @@
public BatchWorker(List<Work> queue, Map<Class,
DocumentBuilder<Object>> documentBuilders,
- Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders,
Properties properties) {
this.queue = queue;
this.documentBuilders = documentBuilders;
this.lockableDirectoryProviders = lockableDirectoryProviders;
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -30,7 +30,7 @@
protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
private Map<Class, DocumentBuilder<Object>> documentBuilders;
- private boolean sync;
+ private Properties properties;
public void performWork(Work work, EventSource session) {
if ( session.isTransactionInProgress() ) {
@@ -38,7 +38,7 @@
PostTransactionWorkQueueSynchronization txSync =
(PostTransactionWorkQueueSynchronization)
queuePerTransaction.get( transaction );
if ( txSync == null || txSync.isConsumed() ) {
- WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, sync );
+ WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, properties );
txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction
);
transaction.registerSynchronization( txSync );
queuePerTransaction.put(transaction, txSync);
@@ -46,7 +46,7 @@
txSync.add( work );
}
else {
- WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, sync );
+ WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, properties );
PostTransactionWorkQueueSynchronization sync = new
PostTransactionWorkQueueSynchronization( workQueue );
sync.add( work );
sync.afterCompletion( Status.STATUS_COMMITTED );
@@ -57,7 +57,6 @@
Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
this.documentBuilders = documentBuilders;
this.lockableDirectoryProviders = lockableDirectoryProviders;
- //default to sync if none defined
- this.sync = ! "async".equalsIgnoreCase( props.getProperty(
Environment.WORKER_PREFIX + "type") );
+ this.properties = props;
}
}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -7,6 +7,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.Status;
@@ -113,7 +114,7 @@
Map<Class, DocumentBuilder<Object>> documentBuilders,
Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
//FIXME should be harmonized with the WorkerFactory?
- WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, true );
+ WorkQueue workQueue = new BatchedWorkQueue( documentBuilders,
lockableDirectoryProviders, new Properties() );
return new PostTransactionWorkQueueSynchronization( workQueue );
}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -158,7 +158,7 @@
assertEquals( "Hibernate in Action", hits.doc( 0 ).get( "title" )
);
}
finally {
- if ( searcher != null ) searcher.close();
+ searcher.close();
}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -0,0 +1,38 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
+import org.hibernate.search.test.TestCase;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.hibernate.SessionFactory;
+import org.hibernate.Session;
+import org.hibernate.Transaction;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class AsyncWorkerTest extends WorkerTestCase {
+
+ protected void configure(Configuration cfg) {
+ cfg.setProperty( "hibernate.search.default.directory_provider",
RAMDirectoryProvider.class.getName() );
+ cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
+ cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
+ cfg.setProperty( Environment.WORKER_PREFIX + "type", "async" );
+ cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.min", "1"
);
+ cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.max",
"10" );
+ FullTextIndexEventListener del = new FullTextIndexEventListener();
+ cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del}
);
+ cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del}
);
+ cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del}
);
+ }
+
+}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -0,0 +1,43 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.GeneratedValue;
+
+import org.hibernate.search.annotations.Indexed;
+import org.hibernate.search.annotations.DocumentId;
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Index;
+
+/**
+ * @author Emmanuel Bernard
+ */
+@Entity
+@Indexed(index="employee")
+public class Employee {
+ @Id
+ @GeneratedValue
+ @DocumentId
+ private long id;
+
+ @Field(index = Index.TOKENIZED )
+ private String name;
+
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -0,0 +1,43 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.GeneratedValue;
+
+import org.hibernate.search.annotations.Indexed;
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Index;
+import org.hibernate.search.annotations.DocumentId;
+
+/**
+ * @author Emmanuel Bernard
+ */
+@Entity
+@Indexed(index="employer")
+public class Employer {
+ @Id
+ @GeneratedValue
+ @DocumentId
+ private long id;
+
+ @Field(index = Index.TOKENIZED )
+ private String name;
+
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -0,0 +1,28 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class SyncWorkerTest extends WorkerTestCase {
+
+ protected void configure(Configuration cfg) {
+ cfg.setProperty( "hibernate.search.default.directory_provider",
RAMDirectoryProvider.class.getName() );
+ cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
+ cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
+ cfg.setProperty( Environment.WORKER_PREFIX + "type", "sync" );
+ FullTextIndexEventListener del = new FullTextIndexEventListener();
+ cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del}
);
+ cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del}
);
+ cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del}
);
+ }
+}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java 2007-01-08
12:10:39 UTC (rev 11021)
+++
branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java 2007-01-08
12:23:31 UTC (rev 11022)
@@ -0,0 +1,124 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hibernate.search.test.TestCase;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.SessionFactory;
+import org.hibernate.Session;
+import org.hibernate.Transaction;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.hibernate.cfg.Configuration;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class WorkerTestCase extends TestCase {
+
+ public void testConcurrency() throws Exception {
+ ExecutorService es = Executors.newFixedThreadPool( 15 );
+ Work work = new Work( getSessions() );
+ ReverseWork reverseWork = new ReverseWork( getSessions() );
+ for (int i = 0 ; i < 200 ; i++) {
+ es.execute( work );
+ es.execute( reverseWork );
+ }
+ while(work.count < 199) {
+ Thread.sleep( 20 );
+ }
+ }
+
+ protected class Work implements Runnable {
+ private SessionFactory sf;
+ public volatile int count = 0;
+
+ public Work(SessionFactory sf) {
+ this.sf = sf;
+ }
+
+ public void run() {
+ Session s = sf.openSession( );
+ Transaction tx = s.beginTransaction();
+ Employee ee = new Employee();
+ ee.setName( "Emmanuel" );
+ s.persist( ee );
+ Employer er = new Employer();
+ er.setName( "RH" );
+ s.persist( er );
+ tx.commit();
+ s.close();
+
+ s = sf.openSession( );
+ tx = s.beginTransaction();
+ ee = (Employee) s.get( Employee.class, ee.getId() );
+ ee.setName( "Emmanuel2" );
+ er = (Employer) s.get( Employer.class, er.getId() );
+ er.setName( "RH2" );
+ tx.commit();
+ s.close();
+
+ s = sf.openSession( );
+ tx = s.beginTransaction();
+ ee = (Employee) s.get( Employee.class, ee.getId() );
+ s.delete( ee );
+ er = (Employer) s.get( Employer.class, er.getId() );
+ s.delete( er );
+ tx.commit();
+ s.close();
+ count++;
+ }
+ }
+
+ protected class ReverseWork implements Runnable {
+ private SessionFactory sf;
+
+ public ReverseWork(SessionFactory sf) {
+ this.sf = sf;
+ }
+
+ public void run() {
+ Session s = sf.openSession( );
+ Transaction tx = s.beginTransaction();
+ Employer er = new Employer();
+ er.setName( "RH" );
+ s.persist( er );
+ Employee ee = new Employee();
+ ee.setName( "Emmanuel" );
+ s.persist( ee );
+ tx.commit();
+ s.close();
+
+ s = sf.openSession( );
+ tx = s.beginTransaction();
+ er = (Employer) s.get( Employer.class, er.getId() );
+ er.setName( "RH2" );
+ ee = (Employee) s.get( Employee.class, ee.getId() );
+ ee.setName( "Emmanuel2" );
+ tx.commit();
+ s.close();
+
+ s = sf.openSession( );
+ tx = s.beginTransaction();
+ er = (Employer) s.get( Employer.class, er.getId() );
+ s.delete( er );
+ ee = (Employee) s.get( Employee.class, ee.getId() );
+ s.delete( ee );
+ tx.commit();
+ s.close();
+ }
+ }
+
+ protected Class[] getMappings() {
+ return new Class[] {
+ Employee.class,
+ Employer.class
+ };
+ }
+}