[jboss-svn-commits] JBL Code SVN: r23273 - in labs/jbossrules/trunk/drools-core/src: main/java/org/drools/common and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 2 15:37:50 EDT 2008


Author: tirelli
Date: 2008-10-02 15:37:50 -0400 (Thu, 02 Oct 2008)
New Revision: 23273

Modified:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
Log:
Refactoring working memory to use a single shared thread-pool among all rulebase partitions, allowing for fine grained control of pool size

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/RuleBaseConfiguration.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -121,6 +121,7 @@
     // the rulebase into multiple partitions that can be evaluated
     // in parallel by using multiple internal threads
     private boolean                        multithread;
+    private int  	                       maxThreads;
 
     private ConflictResolver               conflictResolver;
 
@@ -159,6 +160,7 @@
         out.writeObject( processNodeInstanceFactoryRegistry );
         out.writeBoolean( advancedProcessRuleIntegration );
         out.writeBoolean( multithread );
+        out.writeInt( maxThreads );
     }
 
     public void readExternal(ObjectInput in) throws IOException,
@@ -184,6 +186,7 @@
         processNodeInstanceFactoryRegistry = (NodeInstanceFactoryRegistry) in.readObject();
         advancedProcessRuleIntegration = in.readBoolean();
         multithread = in.readBoolean();
+        maxThreads = in.readInt();
     }
 
     /**
@@ -308,7 +311,10 @@
                                                                                                 "false" ) ).booleanValue() );
 
         setMultithreadEvaluation( Boolean.valueOf( this.chainedProperties.getProperty( "drools.multithreadEvaluation",
-                                                                                    "false" ) ).booleanValue() );
+                                                                                       "false" ) ).booleanValue() );
+
+         setMaxThreads( Integer.parseInt( this.chainedProperties.getProperty( "drools.maxThreads",
+                                                                              "-1" ) ) );
     }
 
     /**
@@ -516,6 +522,30 @@
     public boolean isMultithreadEvaluation() {
         return this.multithread;
     }
+    
+    /**
+     * If multi-thread evaluation is enabled, this parameter configures the 
+     * maximum number of threads each session can use for concurrent Rete
+     * propagation. 
+     * 
+     * @param maxThreads the maximum number of threads to use. If 0 or a 
+     *                   negative number is set, the engine will use number
+     *                   of threads equal to the number of partitions in the
+     *                   rule base. Default number of threads is 0. 
+     */
+    public void setMaxThreads( final int maxThreads ) {
+    	this.maxThreads = maxThreads;
+    }
+    
+    /**
+     * Returns the configured number of maximum threads to use for concurrent
+     * propagation when multi-thread evaluation is enabled. Default is zero.
+     * 
+     * @return
+     */
+    public int getMaxThreads() {
+    	return this.maxThreads;
+    }
 
     private void initProcessNodeInstanceFactoryRegistry() {
         this.processNodeInstanceFactoryRegistry = new NodeInstanceFactoryRegistry();

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -31,8 +31,10 @@
 import java.util.Queue;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -187,6 +189,8 @@
 
     protected Map<RuleBasePartitionId, PartitionTaskManager> partitionManagers;
 
+    protected transient AtomicReference<java.util.concurrent.ExecutorService> threadPool = new AtomicReference<java.util.concurrent.ExecutorService>();
+
     private Map<InternalFactHandle, PropagationContext>      modifyContexts;
 
     // ------------------------------------------------------------
@@ -321,21 +325,30 @@
      */
     public void startPartitionManagers() {
         if ( this.ruleBase.getConfiguration().isMultithreadEvaluation() ) {
-            for ( PartitionTaskManager task : this.partitionManagers.values() ) {
-                task.startService();
+            int maxThreads = (this.ruleBase.getConfiguration().getMaxThreads() > 0) ? this.ruleBase.getConfiguration().getMaxThreads() : this.ruleBase.getPartitionIds().size();
+            if( this.threadPool.compareAndSet( null, Executors.newFixedThreadPool( maxThreads ) ) ) {
+                for ( PartitionTaskManager task : this.partitionManagers.values() ) {
+                    task.setPool( this.threadPool.get() );
+                }
             }
         }
     }
 
     public void stopPartitionManagers() {
         if ( this.ruleBase.getConfiguration().isMultithreadEvaluation() ) {
-            for ( PartitionTaskManager task : this.partitionManagers.values() ) {
-                // what to do here? should we simply wait for a timeout and give
-                // up?
-                task.stopServiceAndWait();
+            java.util.concurrent.ExecutorService service = this.threadPool.get();
+            if( this.threadPool.compareAndSet( service, null ) ) {
+                service.shutdown();
+                for ( PartitionTaskManager task : this.partitionManagers.values() ) {
+                    task.setPool( null );
+                }
             }
         }
     }
+    
+    public boolean isPartitionManagersActive() {
+        return this.threadPool.get() != null;
+    }
 
     private void initTransient() {
         this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/InputMarshaller.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -98,6 +98,7 @@
     public static ReteooStatefulSession readSession(ReteooStatefulSession session,
                                                     MarshallerReaderContext context) throws IOException,
                                                                                     ClassNotFoundException {
+        boolean multithread = context.readBoolean();
         int handleId = context.readInt();
         long handleCounter = context.readLong();
         long propagationCounter = context.readLong();
@@ -129,6 +130,10 @@
         readWorkItems( context );
 
         readTimers( context );
+        
+        if( multithread ) {
+            session.startPartitionManagers();
+        }
 
         return session;
     }
@@ -146,6 +151,9 @@
                                                     int id,
                                                     ExecutorService executor) throws IOException,
                                                                              ClassNotFoundException {
+
+        boolean multithread = context.readBoolean();
+        
         FactHandleFactory handleFactory = context.ruleBase.newFactHandleFactory( context.readInt(),
                                                                                  context.readLong() );
 
@@ -190,6 +198,10 @@
 
         readTimers( context );
 
+        if( multithread ) {
+            session.startPartitionManagers();
+        }
+
         return session;
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/marshalling/OutputMarshaller.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -69,6 +69,15 @@
 public class OutputMarshaller {
     public static void writeSession(MarshallerWriteContext context) throws IOException {
         ReteooWorkingMemory wm = (ReteooWorkingMemory) context.wm;
+        
+        final boolean multithread = wm.isPartitionManagersActive(); 
+        // is multi-thread active?
+        if( multithread ) {
+            context.writeBoolean( true );
+            wm.stopPartitionManagers();
+        } else {
+            context.writeBoolean( false );
+        }
 
         context.writeInt( wm.getFactHandleFactory().getId() );
         context.writeLong( wm.getFactHandleFactory().getRecency() );
@@ -100,6 +109,10 @@
         writeWorkItems( context );
 
         writeTimers( context );
+        
+        if( multithread ) {
+            wm.startPartitionManagers();
+        }
     }
 
     public static void writeAgenda(MarshallerWriteContext context) throws IOException {

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -16,163 +16,88 @@
 
 package org.drools.reteoo;
 
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.spi.PropagationContext;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
 /**
- * A class to control the worker thread for each rulebase partition.
- * It contains an internal Single Thread Pool that ensures thread
- * respawn and a task that ensures no more than a single thread is
- * executing it concurrently.
+ * A class to control the tasks for a given rulebase partition.
+ * It requires a thread pool that is created in the working 
+ * memory and injected in here.
  *
  * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
  */
 public class PartitionTaskManager {
 
-    private ExecutorService pool = null;
     private PartitionTask task = null;
+    private AtomicReference<ExecutorService> pool = new AtomicReference<ExecutorService>();
 
     public PartitionTaskManager( final InternalWorkingMemory workingMemory ) {
         this.task = new PartitionTask( workingMemory );
     }
 
     /**
-     * Starts the service
+     * Sets the thread pool to be used by this partition
+     * @param pool
      */
-    public synchronized void startService() {
-        if( !isRunning() ) {
-            // I'm not sure we should create and destroy the pool every service start/stop,
-            // but for now, lets do that. Later we can reevaluate if that is needed.
-            this.pool = Executors.newSingleThreadExecutor();
-            this.pool.execute( this.task );
-        }
-    }
-
-    /**
-     * Nicely requests the service to stop. This method will not wait
-     * for the service to finish.
-     */
-    public synchronized boolean stopService() {
-        boolean result = true;
-        if( isRunning() ) {
-            this.task.shutdown();
-            // I'm not sure we should create and destroy the pool every service start/stop,
-            // but for now, lets do that. Later we can reevaluate if that is needed.
-            this.pool.shutdown();
-            this.pool = null;
-        }
-        return result;
-    }
-
-    /**
-     * Nicely requests the service to stop. This method will wait up to
-     * the given timeout for the service to finish and will return.
-     *
-     * @return true in case the services finished, false otherwise
-     */
-    public synchronized boolean stopService( final long timeout, final TimeUnit unit ) {
-        boolean result = true;
-        if( isRunning() ) {
-            this.task.shutdown();
-            // I'm not sure we should create and destroy the pool every service start/stop,
-            // but for now, lets do that. Later we can reevaluate if that is needed.
-            this.pool.shutdown();
-            try {
-                result = this.pool.awaitTermination( timeout, unit );
-            } catch( InterruptedException e ) {
-                result = false;
+    public void setPool(ExecutorService pool) {
+        if( pool != null && this.pool.compareAndSet( null, pool ) ) {
+            int size = this.task.queue.size();
+            for( int i = 0; i < size; i++ ) {
+                this.pool.get().execute( this.task );
             }
-            this.pool = null;
+        } else {
+            this.pool.set( pool );
         }
-        return result;
     }
 
-    /**
-     * Nicely requests the service to stop. This method will wait until
-     * the service finishes or an InterruptedException is generated
-     * and will return.
-     *
-     * @return true in case the services finished, false otherwise
-     */
-    public synchronized boolean stopServiceAndWait() {
-        boolean result = true;
-        if( isRunning() ) {
-            this.task.shutdown();
-            // I'm not sure we should create and destroy the pool every service start/stop,
-            // but for now, lets do that. Later we can reevaluate if that is needed.
-            this.pool.shutdown();
-            try {
-                while( !this.pool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
-                    ;
-                }
-                result = this.pool.isTerminated();
-            } catch( InterruptedException e ) {
-                result = false;
-            }
-            this.pool = null;
-        }
-        return result;
-    }
 
     /**
-     * Checks if the task is running.
-     *
-     * @return true if the task is running. false otherwise.
-     */
-    public synchronized boolean isRunning() {
-        return pool != null && !pool.isTerminated();
-    }
-
-    /**
      * Adds the given action to the processing queue
      *
      * @param action the action to be processed
      * @return true if the action was successfully added to the processing queue. false otherwise.
      */
     public boolean enqueue( final Action action ) {
-        return this.task.enqueue( action );
+        boolean result = this.task.enqueue( action );
+        assert result : "result must be true";
+        ExecutorService service = this.pool.get(); 
+        if(  service != null ) {
+            service.execute( this.task );
+        } 
+        return result;
     }
 
     /**
      * A worker task that keeps processing the nodes queue.
-     * The task uses a blocking queue and keeps processing
-     * nodes while there are nodes in the queue and it is not
-     * shutdown. If the queue is emptied, the class will wait
-     * until a new node is added.
+     * The task uses a non-blocking queue and is re-submitted
+     * for execution for each element that is added to the queue.
      */
     public static class PartitionTask implements Runnable {
 
         // the queue with the nodes that need to be processed
-        private BlockingQueue<Action> queue;
+        private Queue<Action> queue;
 
         // the working memory reference
         private InternalWorkingMemory workingMemory;
 
-        // a flag to nicely shutdown the thread
-        private volatile AtomicBoolean shutdown;
-
-        // the actual thread that is running
-        private Thread runner;
-
-
         /**
          * Constructor
          *
          * @param workingMemory the working memory reference that is used for node processing
          */
         public PartitionTask( final InternalWorkingMemory workingMemory ) {
-            this.queue = new LinkedBlockingQueue<Action>();
-            this.shutdown = new AtomicBoolean( false );
+            this.queue = new ConcurrentLinkedQueue<Action>();
             this.workingMemory = workingMemory;
-            this.runner = null;
         }
 
         /**
@@ -181,51 +106,13 @@
          * @see Runnable
          */
         public void run() {
-            // this task can not be shared among multiple threads
-            if( checkAndSetRunning() ) {
-                return;
+            Action action = queue.poll();
+            if( action != null ) {
+                action.execute( workingMemory );
             }
-
-            while( !shutdown.get() ) {
-                try {
-                    // this is a blocking call
-                    if( Thread.currentThread().isInterrupted() ) {
-                        cancel();
-                        break;
-                    }
-                    Action action = queue.take();
-                    action.execute( workingMemory );
-
-                } catch( InterruptedException e ) {
-                    cancel();
-                }
-            }
         }
 
         /**
-         * Requests this task to shutdown
-         */
-        public void shutdown() {
-            synchronized( this ) {
-                if( this.runner != null ) {
-                    this.runner.interrupt();
-                }
-            }
-            this.cancel();
-        }
-
-        /**
-         * Returns true if this task is currently executing
-         *
-         * @return true if the task is currently executing
-         */
-        public boolean isRunning() {
-            synchronized( this ) {
-                return !shutdown.get() && this.runner != null;
-            }
-        }
-
-        /**
          * Adds the given action to the processing queue returning true if the action
          * was correctly added or false otherwise.
          *
@@ -233,41 +120,8 @@
          * @return true if the node was successfully added to the queue. false otherwise.
          */
         public boolean enqueue( final Action action ) {
-            return this.queue.offer( action );
+            return this.queue.add( action );
         }
-
-        /**
-         * Cancels current execution and cleans up used resources
-         */
-        private void cancel() {
-            // if the blocking call was interrupted, then check for the cancelation flag
-            shutdown.set( true );
-            // cleaning up cache reference
-            synchronized( this ) {
-                this.runner = null;
-            }
-        }
-
-        /**
-         * Checks if the task is already running in a different thread. If it is not
-         * running yet, caches current thread reference.
-         *
-         * @return true if the task is already running in a different thread. false otherwise.
-         */
-        private boolean checkAndSetRunning() {
-            synchronized( this ) {
-                if( this.runner == null && !Thread.currentThread().isInterrupted() ) {
-                    // if it is not running yet, cache the thread reference
-                    this.runner = Thread.currentThread();
-                    this.shutdown.set( false );
-                } else {
-                    // there can be only one thread executing each instance of PartitionTask
-                    return true;
-                }
-            }
-            return false;
-        }
-
     }
 
     /**

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java	2008-10-02 18:36:04 UTC (rev 23272)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java	2008-10-02 19:37:50 UTC (rev 23273)
@@ -15,16 +15,15 @@
  */
 package org.drools.reteoo;
 
-import java.util.concurrent.TimeUnit;
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.drools.RuleBase;
 import org.drools.RuleBaseFactory;
 import org.drools.common.InternalWorkingMemory;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.lib.concurrent.DeterministicScheduler;
 
 /**
  * Test case for PartitionTaskManager
@@ -32,14 +31,14 @@
  * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
  */
 public class PartitionTaskManagerTest extends TestCase {
-    private MockAction action;
+    Mockery context = new Mockery();
     private PartitionTaskManager manager;
+    private InternalWorkingMemory workingMemory;
 
     @Override
     public void setUp() {
         RuleBase rulebase = RuleBaseFactory.newRuleBase();
-        InternalWorkingMemory workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
-        action = new MockAction();
+        workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
         manager = new PartitionTaskManager( workingMemory );
     }
 
@@ -48,61 +47,79 @@
 
     }
 
-    public void testStartStopService() throws InterruptedException {
-        assertFalse( manager.isRunning() );
-        manager.startService();
-        Thread.sleep( 1000 );
-        assertTrue( manager.isRunning() );
-        manager.stopService();
-        Thread.sleep( 1000 );
-        assertFalse( manager.isRunning() );
+    public void testEnqueueBeforeSettingExecutor() throws InterruptedException {
+        final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+        // set expectations for the scenario
+        context.checking( new Expectations() {{
+            oneOf( action ).execute( workingMemory );
+        }});
+
+        manager.enqueue( action );
+
+        // this is a jmock helper class that implements the ExecutorService interface
+        DeterministicScheduler pool = new DeterministicScheduler();
+        // set the pool
+        manager.setPool( pool );
+
+        // executes all pending actions using current thread
+        pool.runUntilIdle();
+
+        // check expectations
+        context.assertIsSatisfied();
     }
 
-    public void testNodeCallbacks() throws InterruptedException {
-        // should be possible to enqueue before starting the service,
-        // even if that should never happen
+    public void testFireCorrectly() throws InterruptedException {
+        // creates a mock action
+        final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+        
+        // this is a jmock helper class that implements the ExecutorService interface
+        DeterministicScheduler pool = new DeterministicScheduler();
+        // set the pool
+        manager.setPool( pool );
+        
+        // set expectations for the scenario
+        context.checking( new Expectations() {{
+            oneOf( action ).execute( workingMemory );
+        }});
+        
+        // fire scenario
         manager.enqueue( action );
-        manager.startService();
+        
+        // executes all pending actions using current thread
+        pool.runUntilIdle();
+        
+        // check expectations
+        context.assertIsSatisfied();
+    }
+
+    public void testActionCallbacks() throws InterruptedException {
+        // creates a mock action
+        final PartitionTaskManager.Action action = context.mock( PartitionTaskManager.Action.class );
+        // this is a jmock helper class that implements the ExecutorService interface
+        DeterministicScheduler pool = new DeterministicScheduler();
+        
+        // set expectations for the scenario
+        context.checking( new Expectations() {{
+            exactly(5).of( action ).execute( workingMemory );
+        }});
+        
+        // enqueue before pool
         manager.enqueue( action );
-        // give the engine some time
-        Thread.sleep( 1000 ); 
-        assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
-        assertEquals( 2, action.getCallbackCounter() );
-        // should be possible to enqueue after the stop,
-        // but callback must not be executed
         manager.enqueue( action );
+
+        // set the pool
+        manager.setPool( pool );
+        
+        // enqueue after setting the pool
         manager.enqueue( action );
         manager.enqueue( action );
-        // making sure the service is not processing the nodes
-        Thread.sleep( 1000 );
-        assertEquals( 2, action.getCallbackCounter() );
-        // restarting service
-        manager.startService();
-        // making sure the service had time to process the nodes
-        Thread.sleep( 1000 );
-        assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
-        assertEquals( 5, action.getCallbackCounter() );
+        manager.enqueue( action );
+        
+        // executes all pending actions using current thread
+        pool.runUntilIdle();
+        
+        // check expectations
+        context.assertIsSatisfied();
     }
 
-    public static class MockAction implements PartitionTaskManager.Action {
-        private volatile long callbackCounter = 0;
-
-        public synchronized long getCallbackCounter() {
-            return this.callbackCounter;
-        }
-
-        public void execute( InternalWorkingMemory workingMemory ) {
-            synchronized( this ) {
-                callbackCounter++;
-            }
-        }
-
-        public void writeExternal( ObjectOutput out ) throws IOException {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-    }
 }




More information about the jboss-svn-commits mailing list