Author: epbernard
Date: 2007-01-08 14:19:08 -0500 (Mon, 08 Jan 2007)
New Revision: 11025
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/QueueWorker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/jms/
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
Removed:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/engine/DocumentBuilder.java
Log:
ANN-523 JMS Worker wo test
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/QueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/QueueWorker.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/QueueWorker.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -0,0 +1,26 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import java.util.Properties;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+
+/**
+ * Execute the work for a given queue
+ *
+ * @author Emmanuel Bernard
+ */
+public interface QueueWorker extends Runnable {
+ void run();
+
+ void initialize(Properties props, Map<Class, DocumentBuilder<Object>>
documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders);
+
+ void setQueue(List<Work> queue);
+
+}
Deleted:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -1,24 +0,0 @@
-//$Id: $
-package org.hibernate.search.backend;
-
-/**
- * Set of work operations
- *
- * @author Emmanuel Bernard
- */
-public interface WorkQueue {
- /**
- * Add a work
- */
- void add(Work work);
-
- /**
- * Execute works
- */
- void performWork();
-
- /**
- * Rollback works
- */
- void cancelWork();
-}
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/WorkQueue.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -0,0 +1,26 @@
+//$Id: $
+package org.hibernate.search.backend;
+
+import org.hibernate.search.backend.Work;
+
+/**
+ * Set of work operations
+ *
+ * @author Emmanuel Bernard
+ */
+public interface WorkQueue {
+ /**
+ * Add a work
+ */
+ void add(Work work);
+
+ /**
+ * Execute works
+ */
+ void performWork();
+
+ /**
+ * Rollback works
+ */
+ void cancelWork();
+}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/Worker.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -15,7 +15,6 @@
* Perform work for a given session. This implementation has to be multi threaded
* @author Emmanuel Bernard
*/
-//TODO merge worker and workQueue to do a list of workers through work delegation
public interface Worker {
void performWork(Work work, EventSource session);
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedQueueWorker.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -0,0 +1,77 @@
+//$Id: $
+package org.hibernate.search.backend.impl;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.backend.Workspace;
+import org.hibernate.search.backend.AddWork;
+import org.hibernate.search.backend.QueueWorker;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.engine.DocumentBuilder;
+
+/**
+ * @author Emmanuel Bernard
+*/
+class BatchedQueueWorker implements QueueWorker {
+ private List<Work> queue;
+ private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private Map<Class, DocumentBuilder<Object>> documentBuilders;
+
+ public void run() {
+ Workspace workspace;
+ LuceneWorker worker;
+ workspace = new Workspace( documentBuilders, lockableDirectoryProviders );
+ worker = new LuceneWorker( workspace );
+ try {
+ deadlockFreeQueue(queue, workspace);
+ for ( Work work : queue ) {
+ worker.performWork( work );
+ }
+ }
+ finally {
+ workspace.clean();
+ queue.clear();
+ }
+ }
+
+ public void initialize(Properties props, Map<Class, DocumentBuilder<Object>>
documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ this.documentBuilders = documentBuilders;
+ this.lockableDirectoryProviders = lockableDirectoryProviders;
+ }
+
+ public void setQueue(List<Work> queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * one must lock the directory providers in the exact same order to avoid
+ * dead lock between concurrent threads or processes
+ * To achieve that, the work will be done per directory provider
+ */
+ private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
+ Collections.sort( queue, new Comparator<Work>() {
+ public int compare(Work o1, Work o2) {
+ long h1 = getWorkHashCode( o1, workspace );
+ long h2 = getWorkHashCode( o2, workspace );
+ return h1 < h2 ?
+ -1 :
+ h1 == h2 ?
+ 0 :
+ 1;
+ }
+ } );
+ }
+
+ private long getWorkHashCode(Work work, Workspace workspace) {
+ long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
+ if (work instanceof AddWork ) h+=1; //addwork after deleteWork
+ return h;
+ }
+}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/BatchedWorkQueue.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -4,11 +4,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Properties;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
@@ -16,14 +13,11 @@
import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.store.DirectoryProvider;
-import org.hibernate.search.backend.impl.LuceneWorker;
import org.hibernate.search.backend.WorkQueue;
-import org.hibernate.search.backend.Workspace;
import org.hibernate.search.backend.Work;
import org.hibernate.search.backend.UpdateWork;
import org.hibernate.search.backend.DeleteWork;
import org.hibernate.search.backend.AddWork;
-import org.hibernate.search.backend.Worker;
import org.hibernate.search.Environment;
/**
@@ -74,7 +68,9 @@
//TODO implements parallel batchWorkers (one per Directory)
public void performWork() {
- BatchWorker batchWorker = new BatchWorker( queue, documentBuilders,
lockableDirectoryProviders, properties );
+ BatchedQueueWorker batchWorker = new BatchedQueueWorker();
+ batchWorker.initialize( properties, documentBuilders, lockableDirectoryProviders );
+ batchWorker.setQueue( queue );
if (sync) {
batchWorker.run();
}
@@ -87,59 +83,4 @@
queue.clear();
}
- private class BatchWorker implements Runnable {
- private List<Work> queue;
- private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
- private Map<Class, DocumentBuilder<Object>> documentBuilders;
-
-
- public BatchWorker(List<Work> queue, Map<Class,
DocumentBuilder<Object>> documentBuilders,
- Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders,
Properties properties) {
- this.queue = queue;
- this.documentBuilders = documentBuilders;
- this.lockableDirectoryProviders = lockableDirectoryProviders;
- }
-
- public void run() {
- Workspace workspace;
- LuceneWorker worker;
- workspace = new Workspace( documentBuilders, lockableDirectoryProviders );
- worker = new LuceneWorker( workspace );
- try {
- deadlockFreeQueue(queue, workspace);
- for ( Work work : queue ) {
- worker.performWork( work );
- }
- }
- finally {
- workspace.clean();
- queue.clear();
- }
- }
-
- /**
- * one must lock the directory providers in the exact same order to avoid
- * dead lock between concurrent threads or processes
- * To achieve that, the work will be done per directory provider
- */
- private void deadlockFreeQueue(List<Work> queue, final Workspace workspace) {
- Collections.sort( queue, new Comparator<Work>() {
- public int compare(Work o1, Work o2) {
- long h1 = getWorkHashCode( o1, workspace );
- long h2 = getWorkHashCode( o2, workspace );
- return h1 < h2 ?
- -1 :
- h1 == h2 ?
- 0 :
- 1;
- }
- } );
- }
-
- private long getWorkHashCode(Work work, Workspace workspace) {
- long h = workspace.getDocumentBuilder( work.getEntity() ).hashCode() * 2;
- if (work instanceof AddWork) h+=1; //addwork after deleteWork
- return h;
- }
- }
}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/TransactionalWorker.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -12,7 +12,6 @@
import org.hibernate.search.util.WeakIdentityHashMap;
import org.hibernate.search.engine.DocumentBuilder;
import org.hibernate.search.store.DirectoryProvider;
-import org.hibernate.search.Environment;
import org.hibernate.event.EventSource;
import org.hibernate.Transaction;
Added:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/backend/impl/jms/JMSQueueWorker.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -0,0 +1,133 @@
+//$Id: $
+package org.hibernate.search.backend.impl.jms;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantLock;
+import java.io.Serializable;
+
+import javax.jms.QueueSender;
+import javax.jms.QueueConnection;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.ObjectMessage;
+import javax.jms.JMSException;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.Context;
+
+import org.hibernate.search.backend.QueueWorker;
+import org.hibernate.search.backend.Work;
+import org.hibernate.search.engine.DocumentBuilder;
+import org.hibernate.search.store.DirectoryProvider;
+import org.hibernate.search.Environment;
+import org.hibernate.HibernateException;
+
+/**
+ * @author Emmanuel Bernard
+ */
+public class JMSQueueWorker implements QueueWorker {
+ private List<Work> queue;
+ private Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders;
+ private Map<Class, DocumentBuilder<Object>> documentBuilders;
+ private String jmsQueueName;
+ private String jmsConnectionFactoryName;
+ private static final String JNDI_PREFIX = Environment.WORKER_PREFIX +
"jndi.";
+ private Properties properties;
+ private Queue jmsQueue;
+ private QueueConnectionFactory factory;
+
+ public void run() {
+ resetJMSTools();
+ QueueConnection cnn = null;
+ QueueSender sender = null;
+ QueueSession session = null;
+ try {
+ cnn = factory.createQueueConnection();
+ //TODO make transacted parameterized
+ session = cnn.createQueueSession( false, QueueSession.AUTO_ACKNOWLEDGE );
+
+ ObjectMessage message = session.createObjectMessage( );
+ message.setObject( (Serializable) this.queue );
+
+ sender = session.createSender( jmsQueue );
+ sender.send( message );
+
+ session.close();
+ cnn.close();
+ }
+ catch( JMSException e ) {
+ throw new HibernateException("Unable to send Search work to JMS queue: " +
jmsQueueName, e);
+ }
+ }
+
+ private InitialContext getInitialContext(Properties properties) throws NamingException
{
+ Properties jndiProps = getJndiProperties( properties );
+ if (jndiProps.size() == 0) {
+ return new InitialContext( );
+ }
+ else {
+ return new InitialContext( jndiProps );
+ }
+ }
+
+ public void initialize(Properties props, Map<Class, DocumentBuilder<Object>>
documentBuilders,
+ Map<DirectoryProvider, ReentrantLock> lockableDirectoryProviders) {
+ this.documentBuilders = documentBuilders;
+ this.lockableDirectoryProviders = lockableDirectoryProviders;
+ this.properties = props;
+ this.jmsConnectionFactoryName = props.getProperty( Environment.WORKER_PREFIX +
"jms.connection_factory" );
+ this.jmsQueueName = props.getProperty( Environment.WORKER_PREFIX +
"jms.queue" );
+ resetJMSTools();
+
+ }
+
+ private void resetJMSTools() {
+ if (jmsQueue != null && factory != null) return;
+ try {
+ InitialContext initialContext = getInitialContext(properties);
+ jmsQueue = (Queue) initialContext.lookup( jmsQueueName );
+ factory = (QueueConnectionFactory) initialContext.lookup( jmsConnectionFactoryName );
+ }
+ catch (NamingException e) {
+ throw new HibernateException("Unable to lookup Search queue and connection
factory", e);
+ }
+ }
+
+ public void setQueue(List<Work> queue) {
+ this.queue = queue;
+ }
+
+ public static Properties getJndiProperties(Properties properties) {
+
+ HashSet specialProps = new HashSet();
+ specialProps.add( JNDI_PREFIX + "class" );
+ specialProps.add( JNDI_PREFIX + "url" );
+
+ Iterator iter = properties.keySet().iterator();
+ Properties result = new Properties();
+ while ( iter.hasNext() ) {
+ String prop = (String) iter.next();
+ if ( prop.indexOf(JNDI_PREFIX) > -1 && !specialProps.contains(prop) ) {
+ result.setProperty(
+ prop.substring( JNDI_PREFIX.length()+1 ),
+ properties.getProperty(prop)
+ );
+ }
+ }
+
+ String jndiClass = properties.getProperty(JNDI_PREFIX + "class");
+ String jndiURL = properties.getProperty(JNDI_PREFIX + "url");
+ // we want to be able to just use the defaults,
+ // if JNDI environment properties are not supplied
+ // so don't put null in anywhere
+ if (jndiClass != null) result.put( Context.INITIAL_CONTEXT_FACTORY, jndiClass);
+ if (jndiURL != null) result.put(Context.PROVIDER_URL, jndiURL);
+
+ return result;
+ }
+}
Modified:
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/engine/DocumentBuilder.java
===================================================================
---
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/engine/DocumentBuilder.java 2007-01-08
19:13:17 UTC (rev 11024)
+++
branches/Branch_3_2/HibernateExt/metadata/src/java/org/hibernate/search/engine/DocumentBuilder.java 2007-01-08
19:19:08 UTC (rev 11025)
@@ -195,7 +195,7 @@
return Field.Index.NO_NORMS;
case TOKENIZED:
return Field.Index.TOKENIZED;
- case UN_TOKENISED:
+ case UN_TOKENIZED:
return Field.Index.UN_TOKENIZED;
default:
throw new AssertionFailure( "Unexpected Index: " + index );