[jboss-svn-commits] JBL Code SVN: r23273 - in labs/jbossrules/trunk/drools-core/src: main/java/org/drools/common and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Oct 2 15:37:50 EDT 2008
Author: tirelli
Date: 2008-10-02 15:37:50 -0400 (Thu, 02 Oct 2008)
New Revision: 23273
Modified:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
Log:
Refactoring working memory to use a single shared thread-pool among all rulebase partitions, allowing for fine grained control of pool size
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -121,6 +121,7 @@
// the rulebase into multiple partitions that can be evaluated
// in parallel by using multiple internal threads
private boolean multithread;
+ private int maxThreads;
private ConflictResolver conflictResolver;
@@ -159,6 +160,7 @@
out.writeObject( processNodeInstanceFactoryRegistry );
out.writeBoolean( advancedProcessRuleIntegration );
out.writeBoolean( multithread );
+ out.writeInt( maxThreads );
}
public void readExternal(ObjectInput in) throws IOException,
@@ -184,6 +186,7 @@
processNodeInstanceFactoryRegistry = (NodeInstanceFactoryRegistry) in.readObject();
advancedProcessRuleIntegration = in.readBoolean();
multithread = in.readBoolean();
+ maxThreads = in.readInt();
}
/**
@@ -308,7 +311,10 @@
"false" ) ).booleanValue() );
setMultithreadEvaluation( Boolean.valueOf( this.chainedProperties.getProperty( "drools.multithreadEvaluation",
- "false" ) ).booleanValue() );
+ "false" ) ).booleanValue() );
+
+ setMaxThreads( Integer.parseInt( this.chainedProperties.getProperty( "drools.maxThreads",
+ "-1" ) ) );
}
/**
@@ -516,6 +522,30 @@
public boolean isMultithreadEvaluation() {
return this.multithread;
}
+
+ /**
+ * If multi-thread evaluation is enabled, this parameter configures the
+ * maximum number of threads each session can use for concurrent Rete
+ * propagation.
+ *
+ * @param maxThreads the maximum number of threads to use. If 0 or a
+ * negative number is set, the engine will use number
+ * of threads equal to the number of partitions in the
+ * rule base. Default number of threads is 0.
+ */
+ public void setMaxThreads( final int maxThreads ) {
+ this.maxThreads = maxThreads;
+ }
+
+ /**
+ * Returns the configured number of maximum threads to use for concurrent
+ * propagation when multi-thread evaluation is enabled. Default is zero.
+ *
+ * @return
+ */
+ public int getMaxThreads() {
+ return this.maxThreads;
+ }
private void initProcessNodeInstanceFactoryRegistry() {
this.processNodeInstanceFactoryRegistry = new NodeInstanceFactoryRegistry();
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -31,8 +31,10 @@
import java.util.Queue;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -187,6 +189,8 @@
protected Map<RuleBasePartitionId, PartitionTaskManager> partitionManagers;
+ protected transient AtomicReference<java.util.concurrent.ExecutorService> threadPool = new AtomicReference<java.util.concurrent.ExecutorService>();
+
private Map<InternalFactHandle, PropagationContext> modifyContexts;
// ------------------------------------------------------------
@@ -321,21 +325,30 @@
*/
public void startPartitionManagers() {
if ( this.ruleBase.getConfiguration().isMultithreadEvaluation() ) {
- for ( PartitionTaskManager task : this.partitionManagers.values() ) {
- task.startService();
+ int maxThreads = (this.ruleBase.getConfiguration().getMaxThreads() > 0) ? this.ruleBase.getConfiguration().getMaxThreads() : this.ruleBase.getPartitionIds().size();
+ if( this.threadPool.compareAndSet( null, Executors.newFixedThreadPool( maxThreads ) ) ) {
+ for ( PartitionTaskManager task : this.partitionManagers.values() ) {
+ task.setPool( this.threadPool.get() );
+ }
}
}
}
public void stopPartitionManagers() {
if ( this.ruleBase.getConfiguration().isMultithreadEvaluation() ) {
- for ( PartitionTaskManager task : this.partitionManagers.values() ) {
- // what to do here? should we simply wait for a timeout and give
- // up?
- task.stopServiceAndWait();
+ java.util.concurrent.ExecutorService service = this.threadPool.get();
+ if( this.threadPool.compareAndSet( service, null ) ) {
+ service.shutdown();
+ for ( PartitionTaskManager task : this.partitionManagers.values() ) {
+ task.setPool( null );
+ }
}
}
}
+
+ public boolean isPartitionManagersActive() {
+ return this.threadPool.get() != null;
+ }
private void initTransient() {
this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -98,6 +98,7 @@
public static ReteooStatefulSession readSession(ReteooStatefulSession session,
MarshallerReaderContext context) throws IOException,
ClassNotFoundException {
+ boolean multithread = context.readBoolean();
int handleId = context.readInt();
long handleCounter = context.readLong();
long propagationCounter = context.readLong();
@@ -129,6 +130,10 @@
readWorkItems( context );
readTimers( context );
+
+ if( multithread ) {
+ session.startPartitionManagers();
+ }
return session;
}
@@ -146,6 +151,9 @@
int id,
ExecutorService executor) throws IOException,
ClassNotFoundException {
+
+ boolean multithread = context.readBoolean();
+
FactHandleFactory handleFactory = context.ruleBase.newFactHandleFactory( context.readInt(),
context.readLong() );
@@ -190,6 +198,10 @@
readTimers( context );
+ if( multithread ) {
+ session.startPartitionManagers();
+ }
+
return session;
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -69,6 +69,15 @@
public class OutputMarshaller {
public static void writeSession(MarshallerWriteContext context) throws IOException {
ReteooWorkingMemory wm = (ReteooWorkingMemory) context.wm;
+
+ final boolean multithread = wm.isPartitionManagersActive();
+ // is multi-thread active?
+ if( multithread ) {
+ context.writeBoolean( true );
+ wm.stopPartitionManagers();
+ } else {
+ context.writeBoolean( false );
+ }
context.writeInt( wm.getFactHandleFactory().getId() );
context.writeLong( wm.getFactHandleFactory().getRecency() );
@@ -100,6 +109,10 @@
writeWorkItems( context );
writeTimers( context );
+
+ if( multithread ) {
+ wm.startPartitionManagers();
+ }
}
public static void writeAgenda(MarshallerWriteContext context) throws IOException {
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -16,163 +16,88 @@
package org.drools.reteoo;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.spi.PropagationContext;
-
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
/**
- * A class to control the worker thread for each rulebase partition.
- * It contains an internal Single Thread Pool that ensures thread
- * respawn and a task that ensures no more than a single thread is
- * executing it concurrently.
+ * A class to control the tasks for a given rulebase partition.
+ * It requires a thread pool that is created in the working
+ * memory and injected in here.
*
* @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
*/
public class PartitionTaskManager {
- private ExecutorService pool = null;
private PartitionTask task = null;
+ private AtomicReference<ExecutorService> pool = new AtomicReference<ExecutorService>();
public PartitionTaskManager( final InternalWorkingMemory workingMemory ) {
this.task = new PartitionTask( workingMemory );
}
/**
- * Starts the service
+ * Sets the thread pool to be used by this partition
+ * @param pool
*/
- public synchronized void startService() {
- if( !isRunning() ) {
- // I'm not sure we should create and destroy the pool every service start/stop,
- // but for now, lets do that. Later we can reevaluate if that is needed.
- this.pool = Executors.newSingleThreadExecutor();
- this.pool.execute( this.task );
- }
- }
-
- /**
- * Nicely requests the service to stop. This method will not wait
- * for the service to finish.
- */
- public synchronized boolean stopService() {
- boolean result = true;
- if( isRunning() ) {
- this.task.shutdown();
- // I'm not sure we should create and destroy the pool every service start/stop,
- // but for now, lets do that. Later we can reevaluate if that is needed.
- this.pool.shutdown();
- this.pool = null;
- }
- return result;
- }
-
- /**
- * Nicely requests the service to stop. This method will wait up to
- * the given timeout for the service to finish and will return.
- *
- * @return true in case the services finished, false otherwise
- */
- public synchronized boolean stopService( final long timeout, final TimeUnit unit ) {
- boolean result = true;
- if( isRunning() ) {
- this.task.shutdown();
- // I'm not sure we should create and destroy the pool every service start/stop,
- // but for now, lets do that. Later we can reevaluate if that is needed.
- this.pool.shutdown();
- try {
- result = this.pool.awaitTermination( timeout, unit );
- } catch( InterruptedException e ) {
- result = false;
+ public void setPool(ExecutorService pool) {
+ if( pool != null && this.pool.compareAndSet( null, pool ) ) {
+ int size = this.task.queue.size();
+ for( int i = 0; i < size; i++ ) {
+ this.pool.get().execute( this.task );
}
- this.pool = null;
+ } else {
+ this.pool.set( pool );
}
- return result;
}
- /**
- * Nicely requests the service to stop. This method will wait until
- * the service finishes or an InterruptedException is generated
- * and will return.
- *
- * @return true in case the services finished, false otherwise
- */
- public synchronized boolean stopServiceAndWait() {
- boolean result = true;
- if( isRunning() ) {
- this.task.shutdown();
- // I'm not sure we should create and destroy the pool every service start/stop,
- // but for now, lets do that. Later we can reevaluate if that is needed.
- this.pool.shutdown();
- try {
- while( !this.pool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
- ;
- }
- result = this.pool.isTerminated();
- } catch( InterruptedException e ) {
- result = false;
- }
- this.pool = null;
- }
- return result;
- }
/**
- * Checks if the task is running.
- *
- * @return true if the task is running. false otherwise.
- */
- public synchronized boolean isRunning() {
- return pool != null && !pool.isTerminated();
- }
-
- /**
* Adds the given action to the processing queue
*
* @param action the action to be processed
* @return true if the action was successfully added to the processing queue. false otherwise.
*/
public boolean enqueue( final Action action ) {
- return this.task.enqueue( action );
+ boolean result = this.task.enqueue( action );
+ assert result : "result must be true";
+ ExecutorService service = this.pool.get();
+ if( service != null ) {
+ service.execute( this.task );
+ }
+ return result;
}
/**
* A worker task that keeps processing the nodes queue.
- * The task uses a blocking queue and keeps processing
- * nodes while there are nodes in the queue and it is not
- * shutdown. If the queue is emptied, the class will wait
- * until a new node is added.
+ * The task uses a non-blocking queue and is re-submitted
+ * for execution for each element that is added to the queue.
*/
public static class PartitionTask implements Runnable {
// the queue with the nodes that need to be processed
- private BlockingQueue<Action> queue;
+ private Queue<Action> queue;
// the working memory reference
private InternalWorkingMemory workingMemory;
- // a flag to nicely shutdown the thread
- private volatile AtomicBoolean shutdown;
-
- // the actual thread that is running
- private Thread runner;
-
-
/**
* Constructor
*
* @param workingMemory the working memory reference that is used for node processing
*/
public PartitionTask( final InternalWorkingMemory workingMemory ) {
- this.queue = new LinkedBlockingQueue<Action>();
- this.shutdown = new AtomicBoolean( false );
+ this.queue = new ConcurrentLinkedQueue<Action>();
this.workingMemory = workingMemory;
- this.runner = null;
}
/**
@@ -181,51 +106,13 @@
* @see Runnable
*/
public void run() {
- // this task can not be shared among multiple threads
- if( checkAndSetRunning() ) {
- return;
+ Action action = queue.poll();
+ if( action != null ) {
+ action.execute( workingMemory );
}
-
- while( !shutdown.get() ) {
- try {
- // this is a blocking call
- if( Thread.currentThread().isInterrupted() ) {
- cancel();
- break;
- }
- Action action = queue.take();
- action.execute( workingMemory );
-
- } catch( InterruptedException e ) {
- cancel();
- }
- }
}
/**
- * Requests this task to shutdown
- */
- public void shutdown() {
- synchronized( this ) {
- if( this.runner != null ) {
- this.runner.interrupt();
- }
- }
- this.cancel();
- }
-
- /**
- * Returns true if this task is currently executing
- *
- * @return true if the task is currently executing
- */
- public boolean isRunning() {
- synchronized( this ) {
- return !shutdown.get() && this.runner != null;
- }
- }
-
- /**
* Adds the given action to the processing queue returning true if the action
* was correctly added or false otherwise.
*
@@ -233,41 +120,8 @@
* @return true if the node was successfully added to the queue. false otherwise.
*/
public boolean enqueue( final Action action ) {
- return this.queue.offer( action );
+ return this.queue.add( action );
}
-
- /**
- * Cancels current execution and cleans up used resources
- */
- private void cancel() {
- // if the blocking call was interrupted, then check for the cancelation flag
- shutdown.set( true );
- // cleaning up cache reference
- synchronized( this ) {
- this.runner = null;
- }
- }
-
- /**
- * Checks if the task is already running in a different thread. If it is not
- * running yet, caches current thread reference.
- *
- * @return true if the task is already running in a different thread. false otherwise.
- */
- private boolean checkAndSetRunning() {
- synchronized( this ) {
- if( this.runner == null && !Thread.currentThread().isInterrupted() ) {
- // if it is not running yet, cache the thread reference
- this.runner = Thread.currentThread();
- this.shutdown.set( false );
- } else {
- // there can be only one thread executing each instance of PartitionTask
- return true;
- }
- }
- return false;
- }
-
}
/**
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java 2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java 2008-10-02 19:37:50 UTC (rev 23273)
@@ -15,16 +15,15 @@
*/
package org.drools.reteoo;
-import java.util.concurrent.TimeUnit;
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-
+import junit.framework.Assert;
import junit.framework.TestCase;
import org.drools.RuleBase;
import org.drools.RuleBaseFactory;
import org.drools.common.InternalWorkingMemory;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.lib.concurrent.DeterministicScheduler;
/**
* Test case for PartitionTaskManager
@@ -32,14 +31,14 @@
* @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
*/
public class PartitionTaskManagerTest extends TestCase {
- private MockAction action;
+ Mockery context = new Mockery();
private PartitionTaskManager manager;
+ private InternalWorkingMemory workingMemory;
@Override
public void setUp() {
RuleBase rulebase = RuleBaseFactory.newRuleBase();
- InternalWorkingMemory workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
- action = new MockAction();
+ workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
manager = new PartitionTaskManager( workingMemory );
}
@@ -48,61 +47,79 @@
}
- public void testStartStopService() throws InterruptedException {
- assertFalse( manager.isRunning() );
- manager.startService();
- Thread.sleep( 1000 );
- assertTrue( manager.isRunning() );
- manager.stopService();
- Thread.sleep( 1000 );
- assertFalse( manager.isRunning() );
+ public void testEnqueueBeforeSettingExecutor() throws InterruptedException {
+ final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+ // set expectations for the scenario
+ context.checking( new Expectations() {{
+ oneOf( action ).execute( workingMemory );
+ }});
+
+ manager.enqueue( action );
+
+ // this is a jmock helper class that implements the ExecutorService interface
+ DeterministicScheduler pool = new DeterministicScheduler();
+ // set the pool
+ manager.setPool( pool );
+
+ // executes all pending actions using current thread
+ pool.runUntilIdle();
+
+ // check expectations
+ context.assertIsSatisfied();
}
- public void testNodeCallbacks() throws InterruptedException {
- // should be possible to enqueue before starting the service,
- // even if that should never happen
+ public void testFireCorrectly() throws InterruptedException {
+ // creates a mock action
+ final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+
+ // this is a jmock helper class that implements the ExecutorService interface
+ DeterministicScheduler pool = new DeterministicScheduler();
+ // set the pool
+ manager.setPool( pool );
+
+ // set expectations for the scenario
+ context.checking( new Expectations() {{
+ oneOf( action ).execute( workingMemory );
+ }});
+
+ // fire scenario
manager.enqueue( action );
- manager.startService();
+
+ // executes all pending actions using current thread
+ pool.runUntilIdle();
+
+ // check expectations
+ context.assertIsSatisfied();
+ }
+
+ public void testActionCallbacks() throws InterruptedException {
+ // creates a mock action
+ final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+ // this is a jmock helper class that implements the ExecutorService interface
+ DeterministicScheduler pool = new DeterministicScheduler();
+
+ // set expectations for the scenario
+ context.checking( new Expectations() {{
+ exactly(5).of( action ).execute( workingMemory );
+ }});
+
+ // enqueue before pool
manager.enqueue( action );
- // give the engine some time
- Thread.sleep( 1000 );
- assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
- assertEquals( 2, action.getCallbackCounter() );
- // should be possible to enqueue after the stop,
- // but callback must not be executed
manager.enqueue( action );
+
+ // set the pool
+ manager.setPool( pool );
+
+ // enqueue after setting the pool
manager.enqueue( action );
manager.enqueue( action );
- // making sure the service is not processing the nodes
- Thread.sleep( 1000 );
- assertEquals( 2, action.getCallbackCounter() );
- // restarting service
- manager.startService();
- // making sure the service had time to process the nodes
- Thread.sleep( 1000 );
- assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
- assertEquals( 5, action.getCallbackCounter() );
+ manager.enqueue( action );
+
+ // executes all pending actions using current thread
+ pool.runUntilIdle();
+
+ // check expectations
+ context.assertIsSatisfied();
}
- public static class MockAction implements PartitionTaskManager.Action {
- private volatile long callbackCounter = 0;
-
- public synchronized long getCallbackCounter() {
- return this.callbackCounter;
- }
-
- public void execute( InternalWorkingMemory workingMemory ) {
- synchronized( this ) {
- callbackCounter++;
- }
- }
-
- public void writeExternal( ObjectOutput out ) throws IOException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- }
}
More information about the jboss-svn-commits
mailing list