[hibernate-commits] Hibernate SVN: r11022 - in branches/Branch_3_2/HibernateExt/metadata/src: java/org/hibernate/search/backend/impl java/org/hibernate/search/impl test/org/hibernate/search/test test/org/hibernate/search/test/worker

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Mon Jan 8 07:23:48 EST 2007


Author: epbernard
Date: 2007-01-08 07:23:31 -0500 (Mon, 08 Jan 2007)
New Revision: 11022

Added:
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
Modified:
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
   branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
   branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java
Log:
ANN-520 asynchronous worker queue

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -6,7 +6,13 @@
 import java.util.Map;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Properties;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
 
 import org.hibernate.search.engine.DocumentBuilder;
 import org.hibernate.search.store.DirectoryProvider;
@@ -17,6 +23,8 @@
 import org.hibernate.search.backend.UpdateWork;
 import org.hibernate.search.backend.DeleteWork;
 import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.Worker;
+import org.hibernate.search.Environment;
 
 /**
  * Batch work until #performWork is called.
@@ -27,14 +35,29 @@
 public class BatchedWorkQueue implements WorkQueue {
 	private List<Work> queue = new ArrayList<Work>();
 	private boolean sync;
-	Map<Class, DocumentBuilder<Object>> documentBuilders;
-	Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+	private Map<Class, DocumentBuilder<Object>> documentBuilders;
+	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+	private ExecutorService executorService;
+	private Properties properties;
 
 	public BatchedWorkQueue(Map<Class, DocumentBuilder<Object>> documentBuilders,
-								Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders, boolean sync) {
+								Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders,
+								Properties properties) {
 		this.documentBuilders = documentBuilders;
 		this.lockableDirectoryProviders = lockableDirectoryProviders;
-		this.sync = sync;
+		this.properties = properties;
+		//default to sync if none defined
+		this.sync = ! "async".equalsIgnoreCase( properties.getProperty( Environment.WORKER_PREFIX + "type") );
+		int min = Integer.parseInt(
+				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.min", "0" )
+		);
+		int max = Integer.parseInt(
+				properties.getProperty( Environment.WORKER_PREFIX + "thread_pool.max", "0" ).trim()
+		);
+		if (max == 0) max = Integer.MAX_VALUE;
+		if ( ! sync) {
+			executorService = new ThreadPoolExecutor( min, max, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() );
+		}
 	}
 
 	public void add(Work work) {
@@ -51,14 +74,12 @@
 
 	//TODO implements parallel batchWorkers (one per Directory)
 	public void performWork() {
-		BatchWorker batchWorker = new BatchWorker( queue, documentBuilders, lockableDirectoryProviders );
+		BatchWorker batchWorker = new BatchWorker( queue, documentBuilders, lockableDirectoryProviders, properties );
 		if (sync) {
 			batchWorker.run();
 		}
 		else {
-			//TODO pool threads?
-			Thread thread = new Thread(batchWorker);
-			thread.start();
+			executorService.execute( batchWorker );
 		}
 	}
 
@@ -73,7 +94,7 @@
 
 
 		public BatchWorker(List<Work> queue, Map<Class, DocumentBuilder<Object>> documentBuilders,
-						   Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+						   Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders, Properties properties) {
 			this.queue = queue;
 			this.documentBuilders = documentBuilders;
 			this.lockableDirectoryProviders = lockableDirectoryProviders;

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -30,7 +30,7 @@
 	protected WeakIdentityHashMap queuePerTransaction = new WeakIdentityHashMap();
 	private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
 	private Map<Class, DocumentBuilder<Object>> documentBuilders;
-	private boolean sync;
+	private Properties properties;
 
 	public void performWork(Work work, EventSource session) {
 		if ( session.isTransactionInProgress() ) {
@@ -38,7 +38,7 @@
 			PostTransactionWorkQueueSynchronization txSync = (PostTransactionWorkQueueSynchronization)
 					queuePerTransaction.get( transaction );
 			if ( txSync == null || txSync.isConsumed() ) {
-				WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, sync );
+				WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, properties );
 				txSync = new PostTransactionWorkQueueSynchronization( workQueue, queuePerTransaction );
 				transaction.registerSynchronization( txSync );
 				queuePerTransaction.put(transaction, txSync);
@@ -46,7 +46,7 @@
 			txSync.add( work );
 		}
 		else {
-			WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, sync );
+			WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, properties );
 			PostTransactionWorkQueueSynchronization sync = new PostTransactionWorkQueueSynchronization( workQueue );
 			sync.add( work );
 			sync.afterCompletion( Status.STATUS_COMMITTED );
@@ -57,7 +57,6 @@
 						   Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
 		this.documentBuilders = documentBuilders;
 		this.lockableDirectoryProviders = lockableDirectoryProviders;
-		//default to sync if none defined
-		this.sync = ! "async".equalsIgnoreCase( props.getProperty( Environment.WORKER_PREFIX + "type") );
+		this.properties = props;
 	}
 }

Modified: branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/impl/FullTextSessionImpl.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -7,6 +7,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
@@ -113,7 +114,7 @@
 			Map<Class, DocumentBuilder<Object>> documentBuilders,
 			Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
 		//FIXME should be harmonized with the WorkerFactory?
-		WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, true );
+		WorkQueue workQueue = new BatchedWorkQueue( documentBuilders, lockableDirectoryProviders, new Properties() );
 		return new PostTransactionWorkQueueSynchronization( workQueue );
 	}
 

Modified: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/FSDirectoryTest.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -158,7 +158,7 @@
 			assertEquals( "Hibernate in Action", hits.doc( 0 ).get( "title" ) );
 		}
 		finally {
-			if ( searcher != null ) searcher.close();
+			searcher.close();
 		}
 
 

Added: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/AsyncWorkerTest.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -0,0 +1,38 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+
+import org.hibernate.search.test.TestCase;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.cfg.Configuration;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.hibernate.SessionFactory;
+import org.hibernate.Session;
+import org.hibernate.Transaction;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class AsyncWorkerTest extends WorkerTestCase {
+
+	protected void configure(Configuration cfg) {
+		cfg.setProperty( "hibernate.search.default.directory_provider", RAMDirectoryProvider.class.getName() );
+		cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
+		cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
+		cfg.setProperty( Environment.WORKER_PREFIX + "type", "async" );
+		cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.min", "1" );
+		cfg.setProperty( Environment.WORKER_PREFIX + "thread_pool.max", "10" );
+		FullTextIndexEventListener del = new FullTextIndexEventListener();
+		cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del} );
+		cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del} );
+		cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del} );
+	}
+
+}

Added: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employee.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -0,0 +1,43 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.GeneratedValue;
+
+import org.hibernate.search.annotations.Indexed;
+import org.hibernate.search.annotations.DocumentId;
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Index;
+
+/**
+ * @author Emmanuel Bernard
+ */
+ at Entity
+ at Indexed(index="employee")
+public class Employee {
+	@Id
+	@GeneratedValue
+	@DocumentId
+	private long id;
+
+	@Field(index = Index.TOKENIZED )
+	private String name;
+
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+}

Added: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/Employer.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -0,0 +1,43 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.GeneratedValue;
+
+import org.hibernate.search.annotations.Indexed;
+import org.hibernate.search.annotations.Field;
+import org.hibernate.search.annotations.Index;
+import org.hibernate.search.annotations.DocumentId;
+
+/**
+ * @author Emmanuel Bernard
+ */
+ at Entity
+ at Indexed(index="employer")
+public class Employer {
+	@Id
+	@GeneratedValue
+	@DocumentId
+	private long id;
+
+	@Field(index = Index.TOKENIZED )
+	private String name;
+
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+}

Added: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/SyncWorkerTest.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -0,0 +1,28 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import org.hibernate.cfg.Configuration;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class SyncWorkerTest extends WorkerTestCase {
+
+	protected void configure(Configuration cfg) {
+		cfg.setProperty( "hibernate.search.default.directory_provider", RAMDirectoryProvider.class.getName() );
+		cfg.setProperty( Environment.ANALYZER_CLASS, StopAnalyzer.class.getName() );
+		cfg.setProperty( Environment.WORKER_IMPL, "transactional" );
+		cfg.setProperty( Environment.WORKER_PREFIX + "type", "sync" );
+		FullTextIndexEventListener del = new FullTextIndexEventListener();
+		cfg.getEventListeners().setPostDeleteEventListeners( new PostDeleteEventListener[]{del} );
+		cfg.getEventListeners().setPostUpdateEventListeners( new PostUpdateEventListener[]{del} );
+		cfg.getEventListeners().setPostInsertEventListeners( new PostInsertEventListener[]{del} );
+	}
+}

Added: branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java
===================================================================
--- branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java	2007-01-08 12:10:39 UTC (rev 11021)
+++ branches/Branch_3_2/HibernateExt/metadata/src/test/org/hibernate/search/test/worker/WorkerTestCase.java	2007-01-08 12:23:31 UTC (rev 11022)
@@ -0,0 +1,124 @@
+//$Id: $
+package org.hibernate.search.test.worker;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.hibernate.search.test.TestCase;
+import org.hibernate.search.store.RAMDirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.search.event.FullTextIndexEventListener;
+import org.hibernate.SessionFactory;
+import org.hibernate.Session;
+import org.hibernate.Transaction;
+import org.hibernate.event.PostDeleteEventListener;
+import org.hibernate.event.PostUpdateEventListener;
+import org.hibernate.event.PostInsertEventListener;
+import org.hibernate.cfg.Configuration;
+import org.apache.lucene.analysis.StopAnalyzer;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class WorkerTestCase extends TestCase {
+
+	public void testConcurrency() throws Exception {
+		ExecutorService es = Executors.newFixedThreadPool( 15 );
+		Work work = new Work( getSessions() );
+		ReverseWork reverseWork = new ReverseWork( getSessions() );
+		for (int i = 0 ; i < 200 ; i++) {
+			es.execute( work );
+			es.execute( reverseWork );
+		}
+		while(work.count < 199) {
+			Thread.sleep( 20 );
+		}
+	}
+
+	protected class Work implements Runnable {
+		private SessionFactory sf;
+		public volatile int count = 0;
+
+		public Work(SessionFactory sf) {
+			this.sf = sf;
+		}
+
+		public void run() {
+			Session s = sf.openSession( );
+			Transaction tx = s.beginTransaction();
+			Employee ee = new Employee();
+			ee.setName( "Emmanuel" );
+			s.persist( ee );
+			Employer er = new Employer();
+			er.setName( "RH" );
+			s.persist( er );
+			tx.commit();
+			s.close();
+
+			s = sf.openSession( );
+			tx = s.beginTransaction();
+			ee = (Employee) s.get( Employee.class, ee.getId() );
+			ee.setName( "Emmanuel2" );
+			er = (Employer) s.get( Employer.class, er.getId() );
+			er.setName( "RH2" );
+			tx.commit();
+			s.close();
+
+			s = sf.openSession( );
+			tx = s.beginTransaction();
+			ee = (Employee) s.get( Employee.class, ee.getId() );
+			s.delete( ee );
+			er = (Employer) s.get( Employer.class, er.getId() );
+			s.delete( er );
+			tx.commit();
+			s.close();
+			count++;
+		}
+	}
+
+	protected class ReverseWork implements Runnable {
+		private SessionFactory sf;
+
+		public ReverseWork(SessionFactory sf) {
+			this.sf = sf;
+		}
+
+		public void run() {
+			Session s = sf.openSession( );
+			Transaction tx = s.beginTransaction();
+			Employer er = new Employer();
+			er.setName( "RH" );
+			s.persist( er );
+			Employee ee = new Employee();
+			ee.setName( "Emmanuel" );
+			s.persist( ee );
+			tx.commit();
+			s.close();
+
+			s = sf.openSession( );
+			tx = s.beginTransaction();
+			er = (Employer) s.get( Employer.class, er.getId() );
+			er.setName( "RH2" );
+			ee = (Employee) s.get( Employee.class, ee.getId() );
+			ee.setName( "Emmanuel2" );
+			tx.commit();
+			s.close();
+
+			s = sf.openSession( );
+			tx = s.beginTransaction();
+			er = (Employer) s.get( Employer.class, er.getId() );
+			s.delete( er );
+			ee = (Employee) s.get( Employee.class, ee.getId() );
+			s.delete( ee );
+			tx.commit();
+			s.close();
+		}
+	}
+
+	protected Class[] getMappings() {
+		return new Class[] {
+				Employee.class,
+				Employer.class
+		};
+	}
+}




More information about the hibernate-commits mailing list