[jboss-svn-commits] JBL Code SVN: r21843 - in labs/jbossrules/trunk: drools-compiler/src/test/java/org/drools/testframework and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Aug 24 12:01:02 EDT 2008


Author: tirelli
Date: 2008-08-24 12:00:53 -0400 (Sun, 24 Aug 2008)
New Revision: 21843

Added:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
Removed:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
Modified:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java
Log:
Developing support for multi-thread concurrent partition propagation

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
 import org.drools.common.BaseNode;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.NodeMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockLeftTupleSink extends LeftTupleSource
@@ -43,11 +44,11 @@
     private LeftTupleSinkNode     nextTupleSinkNode;
 
     public MockLeftTupleSink() {
-        super( 0 );
+        super( 0, RuleBasePartitionId.MAIN_PARTITION, false );
     }
 
     public MockLeftTupleSink(final int id) {
-        super( id );
+        super( id, RuleBasePartitionId.MAIN_PARTITION, false );
     }
 
     public void assertLeftTuple(final LeftTuple tuple,

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -2,8 +2,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
 
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockRightTupleSink
@@ -25,6 +29,17 @@
 
     public int getId() {
         return 0;
-    }    
+    }
 
+    public RuleBasePartitionId getPartitionId() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 }

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,14 +18,7 @@
 import org.drools.ObjectFilter;
 import org.drools.QueryResults;
 import org.drools.RuleBase;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalRuleBase;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.ObjectStore;
-import org.drools.common.ObjectTypeConfigurationRegistry;
-import org.drools.common.TruthMaintenanceSystem;
-import org.drools.common.WorkingMemoryAction;
+import org.drools.common.*;
 import org.drools.concurrent.ExecutorService;
 import org.drools.event.AgendaEventListener;
 import org.drools.event.AgendaEventSupport;
@@ -41,6 +34,7 @@
 import org.drools.process.instance.timer.TimerManager;
 import org.drools.reteoo.LIANodePropagation;
 import org.drools.reteoo.ObjectTypeConf;
+import org.drools.reteoo.PartitionTaskManager;
 import org.drools.rule.EntryPoint;
 import org.drools.rule.Rule;
 import org.drools.rule.TimeMachine;
@@ -56,7 +50,7 @@
 import org.drools.util.ObjectHashMap;
 
 public class MockWorkingMemory implements InternalWorkingMemory {
-
+                
 	List<Object> facts = new ArrayList<Object>();
 	AgendaEventListener agendaEventListener;
 	TimeMachine timeMachine = new TimeMachine();
@@ -536,6 +530,10 @@
         return null;
     }
 
+    public PartitionTaskManager getPartitionManager( RuleBasePartitionId partitionId ) {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public SessionClock getSessionClock() {
         // TODO Auto-generated method stub
         return null;

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -815,6 +815,10 @@
         return p;
     }
 
+    public List<RuleBasePartitionId> getPartitionIds() {
+        return this.partitionIDs;
+    }
+
     public void addEventListener(final RuleBaseEventListener listener) {
         // since the event support is thread-safe, no need for locks... right?
         this.eventSupport.addEventListener( listener );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -68,12 +68,7 @@
 import org.drools.process.instance.WorkItemManager;
 import org.drools.process.instance.context.variable.VariableScopeInstance;
 import org.drools.process.instance.timer.TimerManager;
-import org.drools.reteoo.EntryPointNode;
-import org.drools.reteoo.InitialFactHandle;
-import org.drools.reteoo.InitialFactHandleDummyObject;
-import org.drools.reteoo.LIANodePropagation;
-import org.drools.reteoo.LeftTuple;
-import org.drools.reteoo.ObjectTypeConf;
+import org.drools.reteoo.*;
 import org.drools.rule.Declaration;
 import org.drools.rule.EntryPoint;
 import org.drools.rule.Rule;
@@ -187,6 +182,8 @@
     
     protected SessionConfiguration                      config;
 
+    protected Map<RuleBasePartitionId, PartitionTaskManager> partitionManagers;
+
     // ------------------------------------------------------------
     // Constructors
     // ------------------------------------------------------------
@@ -278,6 +275,8 @@
                               this );
 
         this.entryPoint = EntryPoint.DEFAULT;
+
+        initPartitionManagers();
         initTransient();
     }
 
@@ -291,6 +290,21 @@
         initTransient();
     }
 
+    /**
+     * Creates the actual partition managers and their tasks for multi-thread processing
+     */
+    private void initPartitionManagers() {
+        if( this.ruleBase.getConfiguration().isPartitionsEnabled() ) {
+
+            // the Map MUST be thread safe
+            this.partitionManagers = new ConcurrentHashMap<RuleBasePartitionId, PartitionTaskManager>();
+
+            for( RuleBasePartitionId partitionId : this.ruleBase.getPartitionIds() ) {
+                this.partitionManagers.put( partitionId, new PartitionTaskManager( this ) );
+            }
+        }
+    }
+
     private void initTransient() {
         this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );
         this.typeConfReg = new ObjectTypeConfigurationRegistry( this.ruleBase );
@@ -1671,6 +1685,10 @@
         return (SessionClock) this.getTimerManager().getTimerService();
     }
 
+    public PartitionTaskManager getPartitionManager( final RuleBasePartitionId partitionId ) {
+        return partitionManagers.get( partitionId );
+    }
+
     //    public static class FactHandleInvalidation implements WorkingMemoryAction {
     //        private final InternalFactHandle handle;
     //        

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -36,6 +36,7 @@
 
     protected int id;
     protected RuleBasePartitionId partitionId;
+    protected boolean partitionsEnabled;
 
     public BaseNode() {
 
@@ -47,17 +48,23 @@
      * @param id
      *      The unique id
      */
-    public BaseNode(final int id) {
+    public BaseNode(final int id, final RuleBasePartitionId partitionId, final boolean partitionsEnabled ) {
         super();
         this.id = id;
+        this.partitionId = partitionId;
+        this.partitionsEnabled = partitionsEnabled;
     }
 
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
         id  = in.readInt();
+        partitionId = (RuleBasePartitionId) in.readObject();
+        partitionsEnabled = in.readBoolean();
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeInt(id);
+        out.writeObject( partitionId );
+        out.writeBoolean( partitionsEnabled );
     }
 
     /* (non-Javadoc)

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -60,6 +60,12 @@
 
     /**
      * @inheritDoc
+     *
+     * The implementation tries to delay locking as much as possible, by running
+     * some potentialy unsafe opperations out of the critical session. In case it
+     * fails the checks, it will move into the critical sessions and re-check everything
+     * before effectively doing any change on data structures. 
+     *
      * @see org.drools.common.NodeMemories#getNodeMemory(org.drools.common.NodeMemory)
      */
     public Object getNodeMemory( NodeMemory node ) {
@@ -75,6 +81,14 @@
         return memory;
     }
 
+
+    /**
+     * Checks if a memory does not exists for the given node and
+     * creates it.
+     * 
+     * @param node
+     * @return
+     */
     private Object createNodeMemory( NodeMemory node ) {
         try {
             this.lock.lock();

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,6 +18,7 @@
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.List;
 
 import org.drools.FactException;
 import org.drools.FactHandle;
@@ -128,4 +129,10 @@
      * @return
      */
     public RuleBasePartitionId createNewPartitionId();
+
+    /**
+     * Return the list of Partition IDs for this rulebase
+     * @return
+     */
+    List<RuleBasePartitionId> getPartitionIds();
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -12,6 +12,7 @@
 import org.drools.process.instance.ProcessInstance;
 import org.drools.process.instance.ProcessInstanceManager;
 import org.drools.reteoo.LIANodePropagation;
+import org.drools.reteoo.PartitionTaskManager;
 import org.drools.rule.Rule;
 import org.drools.rule.TimeMachine;
 import org.drools.spi.Activation;
@@ -99,4 +100,14 @@
     public InternalFactHandle getInitialFactHandle();       
     
     public TimerService getTimerService();
+
+    /**
+     * Returns the PartitionTaskManager for the given partition ID
+     * in case the rulebase has partitions enabled
+     *
+     * @param partitionId the ID of the partition for which the task manager is assigned
+     *
+     * @return the PartitionTaskManager
+     */
+    public PartitionTaskManager getPartitionManager( RuleBasePartitionId partitionId );
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -36,4 +36,12 @@
      *      unique int value
      */
     public int getId();
+
+    /**
+     * Returns the partition ID to which this node belongs to
+     *
+     * @return
+     */
+    public RuleBasePartitionId getPartitionId();
+
 }
\ No newline at end of file

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -30,7 +30,7 @@
 
     private final String                    id;
 
-    public RuleBasePartitionId(String id) {
+    public RuleBasePartitionId( final String id ) {
         this.id = id;
     }
 

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+
+import java.io.Externalizable;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * An abstract super class for ObjectSinks
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public abstract class AbstractObjectSinkAdapter implements ObjectSinkPropagator, Externalizable {
+    protected RuleBasePartitionId partitionId;
+
+    protected AbstractObjectSinkAdapter( RuleBasePartitionId partitionId ) {
+        this.partitionId = partitionId;
+    }
+
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        out.writeObject( this.partitionId );
+    }
+
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        this.partitionId = (RuleBasePartitionId) in.readObject();
+    }
+
+    /**
+     * Returns the partition to which this propagator belongs to
+     *
+     * @return the ID of the partition
+     */
+    public RuleBasePartitionId getPartitionId() {
+        return this.partitionId;  
+    }
+
+    /**
+     * Sets the partition to which this propagator belongs to
+     * 
+     * @param partitionId
+     */
+    public void setPartitionId( RuleBasePartitionId partitionId ) {
+        this.partitionId = partitionId;
+    }
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -70,6 +70,8 @@
                           final boolean unwrapRightObject,
                           final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                leftInput,
                rightInput,
                sourceBinder,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -22,11 +22,7 @@
 
 import org.drools.FactException;
 import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.ContextEntry;
 import org.drools.spi.AlphaNodeFieldConstraint;
@@ -76,13 +72,14 @@
      * @param id Node's ID
      * @param constraint Node's constraints
      * @param objectSource Node's object source
-     * @param hasMemory true if node shall be configured with local memory. False otherwise.
      */
     public AlphaNode(final int id,
                      final AlphaNodeFieldConstraint constraint,
                      final ObjectSource objectSource,
                      final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                objectSource,
                context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
         this.constraint = constraint;
@@ -316,5 +313,19 @@
         public int getId() {
             return 0;
         }
+
+        public RuleBasePartitionId getPartitionId() {
+            return this.sink.getPartitionId();
+        }
+
+        public void writeExternal( ObjectOutput out ) throws IOException {
+            // this is a short living adapter class, so no need for serialization
+        }
+
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+            // this is a short living adapter class, so no need for serialization
+        }
+
+
     }
 }

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * This is an asynchronous implementation of the CompositeObjectSinkAdapter that
+ * is used to propagate facts both between nodes in the same or in different
+ * partitions when partitions are enabled in the rulebase
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncCompositeObjectSinkAdapter extends CompositeObjectSinkAdapter {
+
+    public AsyncCompositeObjectSinkAdapter( RuleBasePartitionId partitionId, int alphaNodeHashingThreshold ) {
+        super(partitionId, alphaNodeHashingThreshold );
+    }
+
+    protected void doPropagateAssertObject( InternalFactHandle factHandle, PropagationContext context,
+                                            InternalWorkingMemory workingMemory, ObjectSink sink ) {
+        // composite propagators need to check each node to decide if the propagation
+        // must be asynchronous or may eventually be synchronous
+        if( this.partitionId.equals( sink.getPartitionId() )) {
+            // same partition, so synchronous propagation is fine
+            sink.assertObject( factHandle, context, workingMemory );
+        } else {
+            // different partition, so use asynchronous propagation
+            PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+            manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, sink ) );
+        }
+    }
+}

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * This is an asynchronous implementation of the SingleObjectSinkAdapter
+ * that is used to propagate facts between different partitions in the
+ * rulebase
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncSingleObjectSinkAdapter extends SingleObjectSinkAdapter {
+    public AsyncSingleObjectSinkAdapter( RuleBasePartitionId partitionId, ObjectSink objectSink ) {
+        super( partitionId, objectSink );
+    }
+
+    public void propagateAssertObject( InternalFactHandle factHandle, PropagationContext context, InternalWorkingMemory workingMemory ) {
+        PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+        manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, this.sink ) );
+    }
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -23,11 +23,7 @@
 import java.util.List;
 
 import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.BetaConstraints;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.AccumulateNode.AccumulateMemory;
 import org.drools.reteoo.CollectNode.CollectMemory;
 import org.drools.rule.Behavior;
@@ -96,11 +92,15 @@
      *            The right input <code>ObjectSource</code>.
      */
     BetaNode(final int id,
+             final RuleBasePartitionId partitionId,
+             final boolean partitionsEnabled,
              final LeftTupleSource leftInput,
              final ObjectSource rightInput,
              final BetaConstraints constraints,
              final Behavior[] behaviors) {
-        super( id );
+        super( id,
+               partitionId,
+               partitionsEnabled );
         this.leftInput = leftInput;
         this.rightInput = rightInput;
         this.constraints = constraints;

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -86,6 +86,8 @@
                        final boolean unwrapRight,
                        final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                leftInput,
                rightInput,
                sourceBinder,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -12,6 +12,7 @@
 import org.drools.common.BaseNode;
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.rule.LiteralConstraint;
 import org.drools.spi.AlphaNodeFieldConstraint;
 import org.drools.spi.Evaluator;
@@ -25,9 +26,7 @@
 import org.drools.util.ObjectHashMap;
 import org.drools.util.ObjectHashMap.ObjectEntry;
 
-public class CompositeObjectSinkAdapter
-    implements
-    ObjectSinkPropagator {
+public class CompositeObjectSinkAdapter extends AbstractObjectSinkAdapter {
 
     //    /** You can override this property via a system property (eg -Ddrools.hashThreshold=4) */
     //    public static final String HASH_THRESHOLD_SYSTEM_PROPERTY = "drools.hashThreshold";
@@ -47,15 +46,17 @@
     private int               alphaNodeHashingThreshold;
 
     public CompositeObjectSinkAdapter() {
-        this( 3 );
+        this( null, 3 );
     }
 
-    public CompositeObjectSinkAdapter(final int alphaNodeHashingThreshold) {
+    public CompositeObjectSinkAdapter(final RuleBasePartitionId partitionId, final int alphaNodeHashingThreshold) {
+        super( partitionId );
         this.alphaNodeHashingThreshold = alphaNodeHashingThreshold;
     }
 
     public void readExternal(ObjectInput in) throws IOException,
                                             ClassNotFoundException {
+        super.readExternal( in );
         otherSinks = (ObjectSinkNodeList) in.readObject();
         hashableSinks = (ObjectSinkNodeList) in.readObject();
         hashedFieldIndexes = (LinkedList) in.readObject();
@@ -64,13 +65,14 @@
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal( out );
         out.writeObject( otherSinks );
         out.writeObject( hashableSinks );
         out.writeObject( hashedFieldIndexes );
         out.writeObject( hashedSinkMap );
         out.writeInt( alphaNodeHashingThreshold );
     }
-    
+
     public ObjectSinkNodeList getOthers() {
         return this.otherSinks;        
     }
@@ -329,9 +331,7 @@
                 final ObjectSink sink = (ObjectSink) this.hashedSinkMap.get( hashKey );
                 if ( sink != null ) {
                     // The sink exists so propagate
-                    sink.assertObject( factHandle,
-                                       context,
-                                       workingMemory );
+                    doPropagateAssertObject( factHandle, context, workingMemory, sink );
                 }
             }
         }
@@ -339,23 +339,35 @@
         // propagate unhashed
         if ( this.hashableSinks != null ) {
             for ( ObjectSinkNode sink = this.hashableSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {
-                sink.assertObject( factHandle,
-                                   context,
-                                   workingMemory );
+                doPropagateAssertObject( factHandle, context, workingMemory, sink );
             }
         }
 
         if ( this.otherSinks != null ) {
             // propagate others
             for ( ObjectSinkNode sink = this.otherSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {
-                sink.assertObject( factHandle,
-                                   context,
-                                   workingMemory );
+                doPropagateAssertObject( factHandle, context, workingMemory, sink );
             }
         }
 
     }
 
+    /**
+     * This is a Hook method for subclasses to override. Please keep it protected unless you know
+     * what you are doing.
+     * 
+     * @param factHandle
+     * @param context
+     * @param workingMemory
+     * @param sink
+     */
+    protected void doPropagateAssertObject( InternalFactHandle factHandle, PropagationContext context,
+                                            InternalWorkingMemory workingMemory, ObjectSink sink ) {
+        sink.assertObject( factHandle,
+                           context,
+                           workingMemory );
+    }
+
     public BaseNode getMatchingNode(BaseNode candidate) {
         if ( this.otherSinks != null ) {
             for ( ObjectSinkNode sink = this.otherSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {

Deleted: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -1,64 +0,0 @@
-package org.drools.reteoo;
-
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
-import org.drools.spi.PropagationContext;
-
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-
-public class EmptyObjectSinkAdapter
-    implements
-    ObjectSinkPropagator {
-
-    private static final long                   serialVersionUID = -631743913176779720L;
-
-    private static final EmptyObjectSinkAdapter instance         = new EmptyObjectSinkAdapter();
-
-    private static final ObjectSink[]           SINK_LIST        = new ObjectSink[0];
-
-    public static EmptyObjectSinkAdapter getInstance() {
-        return instance;
-    }
-
-    public EmptyObjectSinkAdapter() {
-    }
-
-    public void readExternal(ObjectInput in) throws IOException,
-                                            ClassNotFoundException {
-    }
-
-    public void writeExternal(ObjectOutput out) throws IOException {
-    }
-
-    public void propagateAssertObject(final InternalFactHandle factHandle,
-                                      final PropagationContext context,
-                                      final InternalWorkingMemory workingMemory) {
-
-    }
-
-    public void propagateRetractObject(final InternalFactHandle handle,
-                                       final PropagationContext context,
-                                       final InternalWorkingMemory workingMemory,
-                                       final boolean useHash) {
-    }
-
-    public BaseNode getMatchingNode(BaseNode candidate) {
-        return null;
-    }
-
-    public ObjectSink[] getSinks() {
-        return SINK_LIST;
-    }
-
-    public int size() {
-        return 0;
-    }
-
-    public boolean equals(Object obj) {
-        return obj instanceof EmptyObjectSinkAdapter;
-    }
-
-}

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,74 @@
+package org.drools.reteoo;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
+import org.drools.spi.PropagationContext;
+
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+public class EmptyObjectSinkAdapter extends AbstractObjectSinkAdapter {
+
+    private static final long                   serialVersionUID = -631743913176779720L;
+
+    private static final EmptyObjectSinkAdapter INSTANCE = new EmptyObjectSinkAdapter();
+
+    private static final ObjectSink[]           SINK_LIST        = new ObjectSink[0];
+
+    public static EmptyObjectSinkAdapter getInstance() {
+        return INSTANCE;
+    }
+
+    public EmptyObjectSinkAdapter() {
+        super( null );
+    }
+
+    public EmptyObjectSinkAdapter( RuleBasePartitionId partitionId ) {
+        super( partitionId );
+    }
+
+    public void readExternal(ObjectInput in) throws IOException,
+                                            ClassNotFoundException {
+        super.readExternal( in );
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal( out );
+    }
+
+    public RuleBasePartitionId getPartitionId() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void propagateAssertObject(final InternalFactHandle factHandle,
+                                      final PropagationContext context,
+                                      final InternalWorkingMemory workingMemory) {
+
+    }
+
+    public void propagateRetractObject(final InternalFactHandle handle,
+                                       final PropagationContext context,
+                                       final InternalWorkingMemory workingMemory,
+                                       final boolean useHash) {
+    }
+
+    public BaseNode getMatchingNode(BaseNode candidate) {
+        return null;
+    }
+
+    public ObjectSink[] getSinks() {
+        return SINK_LIST;
+    }
+
+    public int size() {
+        return 0;
+    }
+
+    public boolean equals(Object obj) {
+        return obj instanceof EmptyObjectSinkAdapter;
+    }
+
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -27,12 +27,7 @@
 
 import org.drools.WorkingMemoryEntryPoint;
 import org.drools.base.ShadowProxy;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.InternalWorkingMemoryEntryPoint;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.EntryPoint;
 import org.drools.spi.ObjectType;
@@ -87,14 +82,20 @@
                           final ObjectSource objectSource,
                           final BuildContext context) {
         this( id,
+              context.getPartitionId(),
+              context.getRuleBase().getConfiguration().isPartitionsEnabled(),
               objectSource,
               context.getCurrentEntryPoint() ); // irrelevant for this node, since it overrides sink management
     }
 
     public EntryPointNode(final int id,
+                          final RuleBasePartitionId partitionId,
+                          final boolean partitionsEnabled,
                           final ObjectSource objectSource,
                           final EntryPoint entryPoint) {
         super( id,
+               partitionId,
+               partitionsEnabled,
                objectSource,
                999 ); // irrelevant for this node, since it overrides sink management
         this.entryPoint = entryPoint;

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -91,7 +91,9 @@
                              final LeftTupleSource tupleSource,
                              final EvalCondition eval,
                              final BuildContext context) {
-        super( id );
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.condition = eval;
         this.tupleSource = tupleSource;
         this.tupleMemoryEnabled = context.isTupleMemoryEnabled();

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -71,6 +71,8 @@
                       final Behavior[] behaviors,
                       final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                leftInput,
                rightInput,
                joinNodeBinder,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -6,6 +6,7 @@
 import java.io.Serializable;
 
 import org.drools.RuleBaseConfiguration;
+import org.drools.reteoo.builder.BuildContext;
 import org.drools.common.BaseNode;
 import org.drools.common.BetaConstraints;
 import org.drools.common.EmptyBetaConstraints;
@@ -48,8 +49,11 @@
                     final LeftTupleSource tupleSource,
                     final AlphaNodeFieldConstraint[] constraints,
                     final BetaConstraints binder,
-                    final boolean tupleMemoryEnabled) {
-        super( id );
+                    final boolean tupleMemoryEnabled,
+                    final BuildContext context ) {
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.dataProvider = dataProvider;
         this.tupleSource = tupleSource;
         this.alphaConstraints = constraints;

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -70,6 +70,8 @@
                     final Behavior[] behaviors,
                     final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                leftInput,
                rightInput,
                binder,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -17,11 +17,7 @@
  */
 
 import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.spi.PropagationContext;
 import org.drools.util.FactEntry;
@@ -70,14 +66,13 @@
      *      The unique id of this node in the current Rete network
      * @param source
      *      The parent node, where Facts are propagated from
-     * @param binder
-     *      An optional binder to filter out propagations. This binder will exist when
-     *      a predicate is used in the first pattern, for instance
      */
     public LeftInputAdapterNode(final int id,
                                 final ObjectSource source,
                                 final BuildContext context) {
-        super( id );
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.objectSource = source;
         this.leftTupleMemoryEnabled = context.isTupleMemoryEnabled();
     }
@@ -273,6 +268,20 @@
         public int getId() {
             return 0;
         }
+
+        public RuleBasePartitionId getPartitionId() {
+            return sink.getPartitionId();
+        }
+
+        public void writeExternal( ObjectOutput out ) throws IOException {
+            // this is a short living adapter class used only during an update operation, and
+            // as so, no need for serialization code
+        }
+
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+            // this is a short living adapter class used only during an update operation, and
+            // as so, no need for serialization code
+        }
     }
 
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -24,6 +24,7 @@
 
 import org.drools.common.BaseNode;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 /**
@@ -61,8 +62,8 @@
      *
      * @param id
      */
-    LeftTupleSource(final int id) {
-        super( id );
+    LeftTupleSource(final int id, final RuleBasePartitionId partitionId, final boolean partitionsEnabled ) {
+        super( id, partitionId, partitionsEnabled );
         this.sink = EmptyLeftTupleSinkAdapter.getInstance();
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -69,6 +69,8 @@
                    final Behavior[] behaviors,
                    final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                leftInput,
                rightInput,
                joinNodeBinder,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -5,11 +5,15 @@
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public interface ObjectSinkPropagator
     extends
     Externalizable {
+
+    public RuleBasePartitionId getPartitionId();
+    
     public void propagateAssertObject(InternalFactHandle factHandle,
                                       PropagationContext context,
                                       InternalWorkingMemory workingMemory);

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-import java.io.Serializable;
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
@@ -25,6 +24,7 @@
 import org.drools.common.BaseNode;
 import org.drools.common.DefaultFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 /**
@@ -66,8 +66,12 @@
      *
      * @param id
      */
-    ObjectSource(final int id) {
+    ObjectSource(final int id,
+                 final RuleBasePartitionId partitionId,
+                 final boolean partitionsEnabled) {
         this( id,
+              partitionId,
+              partitionsEnabled,
               null,
               3 );
     }
@@ -78,9 +82,11 @@
      * @param id
      */
     ObjectSource(final int id,
+                 final RuleBasePartitionId partitionId,
+                 final boolean partitionsEnabled,
                  final ObjectSource objectSource,
                  final int alphaNodeHashingThreshold) {
-        super( id );
+        super( id, partitionId, partitionsEnabled );
         this.source = objectSource;
         this.alphaNodeHashingThreshold = alphaNodeHashingThreshold;
         this.sink = EmptyObjectSinkAdapter.getInstance();
@@ -115,9 +121,25 @@
      */
     protected void addObjectSink(final ObjectSink objectSink) {
         if ( this.sink instanceof EmptyObjectSinkAdapter ) {
-            this.sink = new SingleObjectSinkAdapter( objectSink );
+            if( this.partitionsEnabled && ! this.getPartitionId().equals( objectSink.getPartitionId() ) ) {
+                // if partitions are enabled and the next node belongs to a different partition,
+                // we need to use the asynchronous propagator
+                this.sink = new AsyncSingleObjectSinkAdapter( this.getPartitionId(), objectSink );
+            } else {
+                // otherwise, we use the lighter synchronous propagator
+                this.sink = new SingleObjectSinkAdapter( this.getPartitionId(), objectSink );
+            }
         } else if ( this.sink instanceof SingleObjectSinkAdapter ) {
-            final CompositeObjectSinkAdapter sinkAdapter = new CompositeObjectSinkAdapter( this.alphaNodeHashingThreshold );
+            final CompositeObjectSinkAdapter sinkAdapter;
+            if( this.partitionsEnabled ) {
+                // a composite propagator may propagate to both nodes in the same partition
+                // as well as in a different partition, so, if partitions are enabled, we
+                // must use the asynchronous version
+                sinkAdapter = new AsyncCompositeObjectSinkAdapter( this.getPartitionId(), this.alphaNodeHashingThreshold );
+            } else {
+                // if partitions are disabled, then it is safe to use the lighter synchronous propagator
+                sinkAdapter = new CompositeObjectSinkAdapter( this.getPartitionId(), this.alphaNodeHashingThreshold );
+            }
             sinkAdapter.addObjectSink( this.sink.getSinks()[0] );
             sinkAdapter.addObjectSink( objectSink );
             this.sink = sinkAdapter;
@@ -143,7 +165,14 @@
             final CompositeObjectSinkAdapter sinkAdapter = (CompositeObjectSinkAdapter) this.sink;
             sinkAdapter.removeObjectSink( objectSink );
             if ( sinkAdapter.size() == 1 ) {
-                this.sink = new SingleObjectSinkAdapter( sinkAdapter.getSinks()[0] );
+                if( this.partitionsEnabled && ! this.getPartitionId().equals( sinkAdapter.getSinks()[0].getPartitionId() ) ) {
+                    // if partitions are enabled and the next node belongs to a different partition,
+                    // we need to use the asynchronous propagator
+                    this.sink = new AsyncSingleObjectSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+                } else {
+                    // otherwise, we use the lighter synchronous propagator
+                    this.sink = new SingleObjectSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+                }
             }
         }
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -16,21 +16,14 @@
  * limitations under the License.
  */
 
-import java.io.Serializable;
 import java.io.Externalizable;
-import java.io.ObjectOutput;
 import java.io.IOException;
 import java.io.ObjectInput;
+import java.io.ObjectOutput;
 
 import org.drools.RuleBaseConfiguration;
 import org.drools.base.ClassObjectType;
-import org.drools.common.AbstractRuleBase;
-import org.drools.common.BaseNode;
-import org.drools.common.DroolsObjectInputStream;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.Declaration;
 import org.drools.rule.EntryPoint;
@@ -38,8 +31,6 @@
 import org.drools.spi.Constraint;
 import org.drools.spi.ObjectType;
 import org.drools.spi.PropagationContext;
-import org.drools.util.FactEntry;
-import org.drools.util.RightTupleList;
 import org.drools.util.Iterator;
 import org.drools.util.ObjectHashSet;
 import org.drools.util.ObjectHashSet.ObjectEntry;
@@ -104,6 +95,8 @@
                           final ObjectType objectType,
                           final BuildContext context) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                source,
                context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
         this.objectType = objectType;

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.drools.reteoo;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A class to control the worker thread for each rulebase partition.
+ * It contains an internal Single Thread Pool that ensures thread
+ * respawn and a task that ensures no more than a single thread is
+ * executing it concurrently.
+ *
+ * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class PartitionTaskManager {
+
+    private ExecutorService pool = null;
+    private PartitionTask task = null;
+
+    public PartitionTaskManager( final InternalWorkingMemory workingMemory ) {
+        this.task = new PartitionTask( workingMemory );
+    }
+
+    /**
+     * Starts the service
+     */
+    public synchronized void startService() {
+        if( !isRunning() ) {
+            // I'm not sure we should create and destroy the pool every service start/stop,
+            // but for now, lets do that. Later we can reevaluate if that is needed.
+            this.pool = Executors.newSingleThreadExecutor();
+            this.pool.execute( this.task );
+        }
+    }
+
+    /**
+     * Nicely requests the service to stop. This method will not wait
+     * for the service to finish.
+     */
+    public synchronized boolean stopService() {
+        boolean result = true;
+        if( isRunning() ) {
+            this.task.shutdown();
+            // I'm not sure we should create and destroy the pool every service start/stop,
+            // but for now, lets do that. Later we can reevaluate if that is needed.
+            this.pool.shutdown();
+            this.pool = null;
+        }
+        return result;
+    }
+
+    /**
+     * Nicely requests the service to stop. This method will wait up to
+     * the given timeout for the service to finish and will return.
+     *
+     * @return true in case the services finished, false otherwise
+     */
+    public synchronized boolean stopService( final long timeout, final TimeUnit unit ) {
+        boolean result = true;
+        if( isRunning() ) {
+            this.task.shutdown();
+            // I'm not sure we should create and destroy the pool every service start/stop,
+            // but for now, lets do that. Later we can reevaluate if that is needed.
+            this.pool.shutdown();
+            try {
+                result = this.pool.awaitTermination( timeout, unit );
+            } catch( InterruptedException e ) {
+                result = false;
+            }
+            this.pool = null;
+        }
+        return result;
+    }
+
+    /**
+     * Nicely requests the service to stop. This method will wait until
+     * the service finishes or an InterruptedException is generated
+     * and will return.
+     *
+     * @return true in case the services finished, false otherwise
+     */
+    public synchronized boolean stopServiceAndWait() {
+        boolean result = true;
+        if( isRunning() ) {
+            this.task.shutdown();
+            // I'm not sure we should create and destroy the pool every service start/stop,
+            // but for now, lets do that. Later we can reevaluate if that is needed.
+            this.pool.shutdown();
+            try {
+                while( !this.pool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
+                    ;
+                }
+                result = this.pool.isTerminated();
+            } catch( InterruptedException e ) {
+                result = false;
+            }
+            this.pool = null;
+        }
+        return result;
+    }
+
+    /**
+     * Checks if the task is running.
+     *
+     * @return true if the task is running. false otherwise.
+     */
+    public synchronized boolean isRunning() {
+        return pool != null && !pool.isTerminated();
+    }
+
+    /**
+     * Adds the given action to the processing queue
+     *
+     * @param action the action to be processed
+     * @return true if the action was successfully added to the processing queue. false otherwise.
+     */
+    public boolean enqueue( final Action action ) {
+        return this.task.enqueue( action );
+    }
+
+    /**
+     * A worker task that keeps processing the nodes queue.
+     * The task uses a blocking queue and keeps processing
+     * nodes while there are nodes in the queue and it is not
+     * shutdown. If the queue is emptied, the class will wait
+     * until a new node is added.
+     */
+    public static class PartitionTask implements Runnable {
+
+        // the queue with the nodes that need to be processed
+        private BlockingQueue<Action> queue;
+
+        // the working memory reference
+        private InternalWorkingMemory workingMemory;
+
+        // a flag to nicely shutdown the thread
+        private volatile AtomicBoolean shutdown;
+
+        // the actual thread that is running
+        private Thread runner;
+
+
+        /**
+         * Constructor
+         *
+         * @param workingMemory the working memory reference that is used for node processing
+         */
+        public PartitionTask( final InternalWorkingMemory workingMemory ) {
+            this.queue = new LinkedBlockingQueue<Action>();
+            this.shutdown = new AtomicBoolean( false );
+            this.workingMemory = workingMemory;
+            this.runner = null;
+        }
+
+        /**
+         * Default execution method.
+         *
+         * @see Runnable
+         */
+        public void run() {
+            // this task can not be shared among multiple threads
+            if( checkAndSetRunning() ) {
+                return;
+            }
+
+            while( !shutdown.get() ) {
+                try {
+                    // this is a blocking call
+                    if( Thread.currentThread().isInterrupted() ) {
+                        cancel();
+                        break;
+                    }
+                    Action action = queue.take();
+                    action.execute( workingMemory );
+
+                } catch( InterruptedException e ) {
+                    cancel();
+                }
+            }
+        }
+
+        /**
+         * Requests this task to shutdown
+         */
+        public void shutdown() {
+            synchronized( this ) {
+                if( this.runner != null ) {
+                    this.runner.interrupt();
+                }
+            }
+            this.cancel();
+        }
+
+        /**
+         * Returns true if this task is currently executing
+         *
+         * @return true if the task is currently executing
+         */
+        public boolean isRunning() {
+            synchronized( this ) {
+                return !shutdown.get() && this.runner != null;
+            }
+        }
+
+        /**
+         * Adds the given action to the processing queue returning true if the action
+         * was correctly added or false otherwise.
+         *
+         * @param action the action to add to the processing queue
+         * @return true if the node was successfully added to the queue. false otherwise.
+         */
+        public boolean enqueue( final Action action ) {
+            return this.queue.offer( action );
+        }
+
+        /**
+         * Cancels current execution and cleans up used resources
+         */
+        private void cancel() {
+            // if the blocking call was interrupted, then check for the cancelation flag
+            shutdown.set( true );
+            // cleaning up cache reference
+            synchronized( this ) {
+                this.runner = null;
+            }
+        }
+
+        /**
+         * Checks if the task is already running in a different thread. If it is not
+         * running yet, caches current thread reference.
+         *
+         * @return true if the task is already running in a different thread. false otherwise.
+         */
+        private boolean checkAndSetRunning() {
+            synchronized( this ) {
+                if( this.runner == null && !Thread.currentThread().isInterrupted() ) {
+                    // if it is not running yet, cache the thread reference
+                    this.runner = Thread.currentThread();
+                    this.shutdown.set( false );
+                } else {
+                    // there can be only one thread executing each instance of PartitionTask
+                    return true;
+                }
+            }
+            return false;
+        }
+
+    }
+
+    /**
+     * An interface for all actions to be executed by the PartitionTask
+     */
+    public static interface Action extends Externalizable {
+        public abstract void execute( final InternalWorkingMemory workingMemory );
+    }
+
+    /**
+     * An abstract super class for all handle-related actions
+     */
+    public static abstract class FactAction implements Action, Externalizable {
+
+        protected InternalFactHandle handle;
+        protected PropagationContext context;
+        protected ObjectSink         sink;
+
+        public FactAction() {
+        }
+
+        public FactAction( final InternalFactHandle handle, final PropagationContext context,
+                           final ObjectSink sink ) {
+            super();
+            this.handle = handle;
+            this.context = context;
+            this.sink = sink;
+        }
+
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+            handle = (InternalFactHandle) in.readObject();
+            context = (PropagationContext) in.readObject();
+            sink = (ObjectSink) in.readObject();
+        }
+
+        public void writeExternal( ObjectOutput out ) throws IOException {
+            out.writeObject( handle );
+            out.writeObject( context );
+            out.writeObject( sink );
+        }
+
+        public abstract void execute( final InternalWorkingMemory workingMemory );
+    }
+
+    static class FactAssertAction extends FactAction {
+        private static final long serialVersionUID = -8478488926430845209L;
+
+        FactAssertAction() {
+        }
+
+        public FactAssertAction( final InternalFactHandle handle, final PropagationContext context,
+                                 final ObjectSink sink ) {
+            super( handle, context, sink );
+        }
+
+        public void execute( final InternalWorkingMemory workingMemory ) {
+            sink.assertObject( this.handle, this.context, workingMemory );
+        }
+    }
+
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -15,46 +15,38 @@
  */
 package org.drools.reteoo;
 
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.Externalizable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.drools.RuleBaseConfiguration;
 import org.drools.RuntimeDroolsException;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.WorkingMemoryAction;
+import org.drools.common.*;
 import org.drools.marshalling.MarshallerReaderContext;
 import org.drools.marshalling.MarshallerWriteContext;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.spi.PropagationContext;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * A node that will add the propagation to the working memory actions queue,
  * in order to allow multiple threads to concurrently assert objects to multiple
  * entry points.
  *
  * @author etirelli
- *
  */
-public class PropagationQueuingNode extends ObjectSource
-    implements
-    ObjectSinkNode,
-    NodeMemory {
+public class PropagationQueuingNode extends ObjectSource implements ObjectSinkNode, NodeMemory {
 
-    private static final long   serialVersionUID        = -615639068150958767L;
+    private static final long serialVersionUID = -615639068150958767L;
 
     // should we make this one configurable?
-    private static final int    PROPAGATION_SLICE_LIMIT = 1000;
+    private static final int PROPAGATION_SLICE_LIMIT = 1000;
 
     private ObjectSinkNode previousObjectSinkNode;
     private ObjectSinkNode nextObjectSinkNode;
-    private PropagateAction     action;
+    private PropagateAction action;
 
     public PropagationQueuingNode() {
     }
@@ -64,29 +56,29 @@
      * propagations until it the engine reaches a safe propagation point,
      * when all the queued facts are propagated.
      *
-     * @param id Node's ID
-     * @param constraint Node's constraints
+     * @param id           Node's ID
      * @param objectSource Node's object source
-     * @param hasMemory true if node shall be configured with local memory. False otherwise.
+     * @param context
      */
-    public PropagationQueuingNode(final int id,
-                                  final ObjectSource objectSource,
-                                  final BuildContext context) {
+    public PropagationQueuingNode( final int id,
+                                   final ObjectSource objectSource,
+                                   final BuildContext context ) {
         super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled(),
                objectSource,
                context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
         this.action = new PropagateAction( this );
     }
 
-    public void readExternal(ObjectInput in) throws IOException,
-                                            ClassNotFoundException {
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
         super.readExternal( in );
         previousObjectSinkNode = (ObjectSinkNode) in.readObject();
         nextObjectSinkNode = (ObjectSinkNode) in.readObject();
         action = (PropagateAction) in.readObject();
     }
 
-    public void writeExternal(ObjectOutput out) throws IOException {
+    public void writeExternal( ObjectOutput out ) throws IOException {
         super.writeExternal( out );
         out.writeObject( previousObjectSinkNode );
         out.writeObject( nextObjectSinkNode );
@@ -97,21 +89,20 @@
      * @see org.drools.reteoo.ObjectSource#updateSink(org.drools.reteoo.ObjectSink, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
      */
     @Override
-    public void updateSink(ObjectSink sink,
-                           PropagationContext context,
-                           InternalWorkingMemory workingMemory) {
+    public void updateSink( ObjectSink sink, PropagationContext context, InternalWorkingMemory workingMemory ) {
 
-        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
+        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+                .getNodeMemory( this );
 
         // this is just sanity code. We may remove it in the future, but keeping it for now.
-        if ( !memory.isEmpty() ) {
-            throw new RuntimeDroolsException( "Error updating sink. Not safe to update sink as the PropagatingQueueingNode memory is not for node: " + this.toString() );
+        if( !memory.isEmpty() ) {
+            throw new RuntimeDroolsException(
+                    "Error updating sink. Not safe to update sink as the PropagatingQueueingNode memory is not for node: " + this
+                            .toString() );
         }
 
         // as this node is simply a queue, ask object source to update the child sink directly
-        this.source.updateSink( sink,
-                                context,
-                                workingMemory );
+        this.source.updateSink( sink, context, workingMemory );
     }
 
     /**
@@ -126,31 +117,23 @@
      * @see org.drools.common.BaseNode#attach(org.drools.common.InternalWorkingMemory[])
      */
     @Override
-    public void attach(InternalWorkingMemory[] workingMemories) {
+    public void attach( InternalWorkingMemory[] workingMemories ) {
         attach();
         // this node does not require update, so nothing else to do.
     }
 
-    /**
-     * @see org.drools.common.BaseNode#remove(ReteooBuilder, org.drools.common.BaseNode, org.drools.common.InternalWorkingMemory[])
-     */
     @Override
-    protected void doRemove(final RuleRemovalContext context,
-                            final ReteooBuilder builder,
-                            final BaseNode node,
-                            final InternalWorkingMemory[] workingMemories) {
-        if ( !node.isInUse() ) {
+    protected void doRemove( final RuleRemovalContext context, final ReteooBuilder builder, final BaseNode node,
+                             final InternalWorkingMemory[] workingMemories ) {
+        if( !node.isInUse() ) {
             removeObjectSink( (ObjectSink) node );
         }
-        if ( !this.isInUse() ) {
-            for ( int i = 0, length = workingMemories.length; i < length; i++ ) {
+        if( !this.isInUse() ) {
+            for( int i = 0, length = workingMemories.length; i < length; i++ ) {
                 workingMemories[i].clearNodeMemory( this );
             }
         }
-        this.source.remove( context,
-                            builder,
-                            this,
-                            workingMemories );
+        this.source.remove( context, builder, this, workingMemories );
     }
 
     /**
@@ -170,95 +153,82 @@
     /**
      * @see org.drools.reteoo.ObjectSinkNode#setNextObjectSinkNode(org.drools.reteoo.ObjectSinkNode)
      */
-    public void setNextObjectSinkNode(ObjectSinkNode next) {
+    public void setNextObjectSinkNode( ObjectSinkNode next ) {
         this.nextObjectSinkNode = next;
     }
 
     /**
      * @see org.drools.reteoo.ObjectSinkNode#setPreviousObjectSinkNode(org.drools.reteoo.ObjectSinkNode)
      */
-    public void setPreviousObjectSinkNode(ObjectSinkNode previous) {
+    public void setPreviousObjectSinkNode( ObjectSinkNode previous ) {
         this.previousObjectSinkNode = previous;
     }
 
     /**
      * @see org.drools.reteoo.ObjectSink#assertObject(InternalFactHandle, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
      */
-    public void assertObject(InternalFactHandle factHandle,
-                             PropagationContext context,
-                             InternalWorkingMemory workingMemory) {
-        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
-        memory.addAction( new AssertAction( factHandle,
-                                            context ) );
+    public void assertObject( InternalFactHandle factHandle, PropagationContext context,
+                              InternalWorkingMemory workingMemory ) {
+        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+                .getNodeMemory( this );
+        memory.addAction( new AssertAction( factHandle, context ) );
 
         // if not queued yet, we need to queue it up
-        if ( memory.isQueued().compareAndSet( false,
-                                              true ) ) {
+        if( memory.isQueued().compareAndSet( false, true ) ) {
             workingMemory.queueWorkingMemoryAction( this.action );
         }
     }
 
-    /**
-     * @see org.drools.reteoo.ObjectSink#isObjectMemoryEnabled()
-     */
     public boolean isObjectMemoryEnabled() {
         return true;
     }
 
-    /**
-     * @see org.drools.reteoo.ObjectSink#retractObject(org.drools.common.InternalFactHandle, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
-     */
-    public void retractObject(InternalFactHandle handle,
-                              PropagationContext context,
-                              InternalWorkingMemory workingMemory) {
-        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
-        memory.addAction( new RetractAction( handle,
-                                             context ) );
+    public void retractObject( InternalFactHandle handle, PropagationContext context,
+                               InternalWorkingMemory workingMemory ) {
+        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+                .getNodeMemory( this );
+        memory.addAction( new RetractAction( handle, context ) );
 
         // if not queued yet, we need to queue it up
-        if ( memory.isQueued().compareAndSet( false,
-                                              true ) ) {
+        if( memory.isQueued().compareAndSet( false, true ) ) {
             workingMemory.queueWorkingMemoryAction( this.action );
         }
     }
 
     /**
      * Propagate all queued actions (asserts and retracts).
-     *
+     * <p/>
      * This method implementation is based on optimistic behavior to avoid the
      * use of locks. There may eventually be a minimum wasted effort, but overall
      * it will be better than paying for the lock's cost.
      *
      * @param workingMemory
      */
-    public void propagateActions(InternalWorkingMemory workingMemory) {
-        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
+    public void propagateActions( InternalWorkingMemory workingMemory ) {
+        final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+                .getNodeMemory( this );
 
         // first we clear up the action queued flag
         memory.isQueued().set( false );
 
         // we limit the propagation to avoid a hang when this queue is never empty
         Action next = memory.getNext();
-        for ( int counter = 0; next != null && counter < PROPAGATION_SLICE_LIMIT; next = memory.getNext(), counter++ ) {
-            next.execute( this.sink,
-                          workingMemory );
+        for( int counter = 0; next != null && counter < PROPAGATION_SLICE_LIMIT; next = memory.getNext(), counter++ ) {
+            next.execute( this.sink, workingMemory );
         }
 
-        if ( memory.hasNext() ) {
+        if( memory.hasNext() ) {
             // add action to the queue again.
             memory.isQueued().set( true );
             workingMemory.queueWorkingMemoryAction( this.action );
         }
     }
 
-    /**
-     * @see org.drools.reteoo.ObjectSink#setObjectMemoryEnabled(boolean)
-     */
-    public void setObjectMemoryEnabled(boolean objectMemoryOn) {
+    public void setObjectMemoryEnabled( boolean objectMemoryOn ) {
         throw new UnsupportedOperationException( "PropagationQueueingNode must have its node memory enabled." );
     }
 
-    public Object createMemory(RuleBaseConfiguration config) {
+    public Object createMemory( RuleBaseConfiguration config ) {
         return new PropagationQueueingNodeMemory();
     }
 
@@ -267,16 +237,14 @@
      *
      * @author etirelli
      */
-    public static class PropagationQueueingNodeMemory
-        implements
-        Externalizable {
+    public static class PropagationQueueingNodeMemory implements Externalizable {
 
-        private static final long             serialVersionUID = 7372028632974484023L;
+        private static final long serialVersionUID = 7372028632974484023L;
 
         private ConcurrentLinkedQueue<Action> queue;
 
         // "singleton" action - there is one of this for each node in each working memory
-        private AtomicBoolean                 isQueued;
+        private AtomicBoolean isQueued;
 
         public PropagationQueueingNodeMemory() {
             super();
@@ -284,13 +252,12 @@
             this.isQueued = new AtomicBoolean( false );
         }
 
-        public void readExternal(ObjectInput in) throws IOException,
-                                                ClassNotFoundException {
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
             queue = (ConcurrentLinkedQueue<Action>) in.readObject();
             isQueued = (AtomicBoolean) in.readObject();
         }
 
-        public void writeExternal(ObjectOutput out) throws IOException {
+        public void writeExternal( ObjectOutput out ) throws IOException {
             out.writeObject( queue );
             out.writeObject( isQueued );
         }
@@ -299,7 +266,7 @@
             return this.queue.isEmpty();
         }
 
-        public void addAction(Action action) {
+        public void addAction( Action action ) {
             this.queue.add( action );
         }
 
@@ -316,9 +283,7 @@
         }
     }
 
-    private static abstract class Action
-        implements
-        Externalizable {
+    private static abstract class Action implements Externalizable {
 
         protected InternalFactHandle handle;
         protected PropagationContext context;
@@ -327,42 +292,34 @@
 
         }
 
-        public Action(InternalFactHandle handle,
-                      PropagationContext context) {
+        public Action( InternalFactHandle handle, PropagationContext context ) {
             super();
             this.handle = handle;
             this.context = context;
         }
 
-        public void readExternal(ObjectInput in) throws IOException,
-                                                ClassNotFoundException {
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
             handle = (InternalFactHandle) in.readObject();
             context = (PropagationContext) in.readObject();
         }
 
-        public void writeExternal(ObjectOutput out) throws IOException {
+        public void writeExternal( ObjectOutput out ) throws IOException {
             out.writeObject( handle );
             out.writeObject( context );
         }
 
-        public abstract void execute(final ObjectSinkPropagator sink,
-                                     final InternalWorkingMemory workingMemory);
+        public abstract void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory );
     }
 
     private static class AssertAction extends Action {
         private static final long serialVersionUID = -8478488926430845209L;
 
-        public AssertAction(final InternalFactHandle handle,
-                            final PropagationContext context) {
-            super( handle,
-                   context );
+        public AssertAction( final InternalFactHandle handle, final PropagationContext context ) {
+            super( handle, context );
         }
 
-        public void execute(final ObjectSinkPropagator sink,
-                            final InternalWorkingMemory workingMemory) {
-            sink.propagateAssertObject( this.handle,
-                                        this.context,
-                                        workingMemory );
+        public void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory ) {
+            sink.propagateAssertObject( this.handle, this.context, workingMemory );
         }
     }
 
@@ -373,26 +330,21 @@
 
         }
 
-        public RetractAction(final InternalFactHandle handle,
-                             final PropagationContext context) {
-            super( handle,
-                   context );
+        public RetractAction( final InternalFactHandle handle, final PropagationContext context ) {
+            super( handle, context );
         }
 
-        public void execute(final ObjectSinkPropagator sink,
-                            final InternalWorkingMemory workingMemory) {
-            
-            for ( RightTuple rightTuple = this.handle.getRightTuple(); rightTuple != null; rightTuple = (RightTuple) rightTuple.getHandleNext() ) {
-                rightTuple.getRightTupleSink().retractRightTuple( rightTuple,
-                                                                  context,
-                                                                  workingMemory );
+        public void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory ) {
+
+            for( RightTuple rightTuple = this.handle
+                    .getRightTuple(); rightTuple != null; rightTuple = (RightTuple) rightTuple.getHandleNext() ) {
+                rightTuple.getRightTupleSink().retractRightTuple( rightTuple, context, workingMemory );
             }
             this.handle.setRightTuple( null );
 
-            for ( LeftTuple leftTuple = this.handle.getLeftTuple(); leftTuple != null; leftTuple = (LeftTuple) leftTuple.getLeftParentNext() ) {
-                leftTuple.getLeftTupleSink().retractLeftTuple( leftTuple,
-                                                      context,
-                                                      workingMemory );
+            for( LeftTuple leftTuple = this.handle.getLeftTuple(); leftTuple != null; leftTuple = (LeftTuple) leftTuple
+                    .getLeftParentNext() ) {
+                leftTuple.getLeftTupleSink().retractLeftTuple( leftTuple, context, workingMemory );
             }
             this.handle.setLeftTuple( null );
         }
@@ -403,13 +355,10 @@
      * this node propagation can be triggered at a safe point
      *
      * @author etirelli
-     *
      */
-    public static class PropagateAction
-        implements
-        WorkingMemoryAction {
+    public static class PropagateAction implements WorkingMemoryAction {
 
-        private static final long      serialVersionUID = 6765029029501617115L;
+        private static final long serialVersionUID = 6765029029501617115L;
 
         private PropagationQueuingNode node;
 
@@ -417,28 +366,27 @@
 
         }
 
-        public PropagateAction(PropagationQueuingNode node) {
+        public PropagateAction( PropagationQueuingNode node ) {
             this.node = node;
         }
-        
-        public PropagateAction(MarshallerReaderContext context)  throws IOException {
-            this.node = ( PropagationQueuingNode ) context.sinks.get( context.readInt() );
-        }          
-        
-        public void write(MarshallerWriteContext context) throws IOException {
+
+        public PropagateAction( MarshallerReaderContext context ) throws IOException {
+            this.node = (PropagationQueuingNode) context.sinks.get( context.readInt() );
+        }
+
+        public void write( MarshallerWriteContext context ) throws IOException {
             context.write( node.getId() );
         }
 
-        public void readExternal(ObjectInput in) throws IOException,
-                                                ClassNotFoundException {
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
             node = (PropagationQueuingNode) in.readObject();
         }
 
-        public void writeExternal(ObjectOutput out) throws IOException {
+        public void writeExternal( ObjectOutput out ) throws IOException {
             out.writeObject( node );
         }
 
-        public void execute(InternalWorkingMemory workingMemory) {
+        public void execute( InternalWorkingMemory workingMemory ) {
             this.node.propagateActions( workingMemory );
         }
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -22,10 +22,8 @@
 import java.io.ObjectInput;
 
 import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.reteoo.builder.BuildContext;
+import org.drools.common.*;
 import org.drools.rule.GroupElement;
 import org.drools.rule.Rule;
 import org.drools.spi.PropagationContext;
@@ -70,18 +68,22 @@
     }
 
     /**
-     * Construct.
+     * Constructor
      *
-     * @param inputSource
-     *            The parent tuple source.
-     * @param rule
-     *            The rule.
+     * @param id node ID
+     * @param source the tuple source for this node
+     * @param rule the rule this node belongs to
+     * @param subrule the subrule this node belongs to
+     * @param context the current build context
      */
     public QueryTerminalNode(final int id,
                              final LeftTupleSource source,
                              final Rule rule,
-                             final GroupElement subrule) {
-        super( id );
+                             final GroupElement subrule,
+                             final BuildContext context ) {
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.rule = rule;
         this.subrule = subrule;
         this.tupleSource = source;

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -25,11 +25,7 @@
 import java.util.List;
 import java.util.Map;
 
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalRuleBase;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.InternalWorkingMemoryEntryPoint;
+import org.drools.common.*;
 import org.drools.rule.EntryPoint;
 import org.drools.spi.ObjectType;
 import org.drools.spi.PropagationContext;
@@ -79,7 +75,7 @@
     // ------------------------------------------------------------
 
     public Rete(InternalRuleBase ruleBase) {
-        super( 0 );
+        super( 0, RuleBasePartitionId.MAIN_PARTITION, ruleBase != null ? ruleBase.getConfiguration().isPartitionsEnabled() : false );
         this.entryPoints = new HashMap<EntryPoint, EntryPointNode>();
         this.ruleBase = ruleBase;
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -29,10 +29,7 @@
 import org.drools.SessionConfiguration;
 import org.drools.StatefulSession;
 import org.drools.StatelessSession;
-import org.drools.common.AbstractRuleBase;
-import org.drools.common.DefaultFactHandle;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
+import org.drools.common.*;
 import org.drools.concurrent.CommandExecutor;
 import org.drools.concurrent.ExecutorService;
 import org.drools.event.RuleBaseEventListener;
@@ -138,6 +135,8 @@
 
         // always add the default entry point
         EntryPointNode epn = new EntryPointNode( this.reteooBuilder.getIdGenerator().getNextId(),
+                                                 RuleBasePartitionId.MAIN_PARTITION,
+                                                 this.config.isPartitionsEnabled(),
                                                  this.rete,
                                                  EntryPoint.DEFAULT );
         epn.attach();

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -17,11 +17,7 @@
  */
 
 import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.EntryPoint;
 import org.drools.spi.PropagationContext;
@@ -72,7 +68,9 @@
     public RightInputAdapterNode(final int id,
                                  final LeftTupleSource source,
                                  final BuildContext context) {
-        super( id );
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.tupleSource = source;
         this.tupleMemoryEnabled = context.isTupleMemoryEnabled();
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -98,12 +98,14 @@
                             final LeftTupleSource source,
                             final Rule rule,
                             final GroupElement subrule,
-                            final BuildContext buildContext) {
-        super( id );
+                            final BuildContext context) {
+        super( id,
+               context.getPartitionId(),
+               context.getRuleBase().getConfiguration().isPartitionsEnabled() );
         this.rule = rule;
         this.tupleSource = source;
         this.subrule = subrule;
-        this.tupleMemoryEnabled = buildContext.isTerminalNodeMemoryEnabled();
+        this.tupleMemoryEnabled = context.isTerminalNodeMemoryEnabled();
     }
 
     // ------------------------------------------------------------

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -3,6 +3,7 @@
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 import java.io.Externalizable;
@@ -10,29 +11,29 @@
 import java.io.IOException;
 import java.io.ObjectInput;
 
-public class SingleObjectSinkAdapter
-    implements
-    ObjectSinkPropagator,
-    Externalizable {
+public class SingleObjectSinkAdapter extends AbstractObjectSinkAdapter {
 
     private static final long serialVersionUID = 873985743021L;
 
-    private ObjectSink        sink;
+    protected ObjectSink        sink;
 
     public SingleObjectSinkAdapter() {
-
+        super( null );
     }
 
-    public SingleObjectSinkAdapter(final ObjectSink sink) {
+    public SingleObjectSinkAdapter(final RuleBasePartitionId partitionId, final ObjectSink sink) {
+        super( partitionId );
         this.sink = sink;
     }
 
     public void readExternal(ObjectInput in) throws IOException,
                                             ClassNotFoundException {
+        super.readExternal( in );
         sink = (ObjectSink) in.readObject();
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal( out );
         out.writeObject( sink );
     }
 
@@ -42,7 +43,6 @@
         this.sink.assertObject( factHandle,
                                 context,
                                 workingMemory );
-
     }
 
     public BaseNode getMatchingNode(BaseNode candidate) {

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -1,10 +1,11 @@
 package org.drools.reteoo;
 
+import org.drools.common.NetworkNode;
+
 /**
  * A simple markup interfaces for Sink types
  * 
  * @author etirelli
  */
-public interface Sink {   
-    public int getId();
+public interface Sink extends NetworkNode {   
 }
\ No newline at end of file

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -112,6 +112,8 @@
         this.currentEntryPoint = EntryPoint.DEFAULT;
         
         this.nodes = new LinkedList<BaseNode>();
+
+        this.partitionId = RuleBasePartitionId.MAIN_PARTITION;
     }
 
     /**

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -49,7 +49,8 @@
                                                                               context.getTupleSource(),
                                                                               (AlphaNodeFieldConstraint[]) context.getAlphaConstraints().toArray( new AlphaNodeFieldConstraint[context.getAlphaConstraints().size()] ),
                                                                               betaConstraints,
-                                                                              context.isTupleMemoryEnabled() ) ) );
+                                                                              context.isTupleMemoryEnabled(),
+                                                                              context ) ) );
         context.setAlphaConstraints( null );
         context.setBetaconstraints( null );
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -146,7 +146,8 @@
             terminal = new QueryTerminalNode( context.getNextId(),
                                               context.getTupleSource(),
                                               rule,
-                                              subrule );
+                                              subrule,
+                                              context );
         }
         if ( context.getWorkingMemories().length == 0 ) {
             ((BaseNode) terminal).attach();

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -20,6 +20,7 @@
 
 import org.drools.common.BaseNode;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class BaseNodeTest extends TestCase {
@@ -44,7 +45,7 @@
         }
 
         public MockBaseNode(final int id) {
-            super( id );
+            super( id, RuleBasePartitionId.MAIN_PARTITION, false );
         }
 
         public void ruleAttached() {

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -19,6 +19,7 @@
 import org.drools.common.EmptyBetaConstraints;
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.Behavior;
 import org.drools.rule.LiteralConstraint;
@@ -550,6 +551,8 @@
                      final LeftTupleSource leftInput,
                      final ObjectSource rightInput) {
             super( id,
+                   RuleBasePartitionId.MAIN_PARTITION,
+                   false,
                    leftInput,
                    rightInput,
                    EmptyBetaConstraints.getInstance(),

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
 import org.drools.common.PropagationContextImpl;
 import org.drools.common.SingleBetaConstraints;
 import org.drools.reteoo.FromNode.FromMemory;
+import org.drools.reteoo.builder.BuildContext;
 import org.drools.rule.Declaration;
 import org.drools.rule.LiteralConstraint;
 import org.drools.rule.Pattern;
@@ -42,10 +43,15 @@
     EqualityEvaluatorsDefinition equals = new EqualityEvaluatorsDefinition();
 
     ClassFieldAccessorStore store = new ClassFieldAccessorStore();
+    private ReteooRuleBase ruleBase;
+    private BuildContext buildContext;
 
     protected void setUp() throws Exception {
         store.setClassFieldAccessorCache( new ClassFieldAccessorCache( Thread.currentThread().getContextClassLoader() ) );
         store.setEagerWire( true );
+
+        ruleBase = (ReteooRuleBase) RuleBaseFactory.newRuleBase();
+        buildContext = new BuildContext( ruleBase, new ReteooBuilder.IdGenerator() );
     }
 
     public void testAlphaNode() {
@@ -54,8 +60,8 @@
                                                                        null,
                                                                        null,
                                                                        null );
-        final ReteooWorkingMemory workingMemory = new ReteooWorkingMemory( 1,
-                                                                           (ReteooRuleBase) RuleBaseFactory.newRuleBase() );
+        final ReteooWorkingMemory workingMemory = new ReteooWorkingMemory( 1, ruleBase );
+
         final ClassFieldReader extractor = store.getReader( Cheese.class,
                                                                   "type",
                                                                   getClass().getClassLoader() );
@@ -79,7 +85,8 @@
                                             null,
                                             new AlphaNodeFieldConstraint[]{constraint},
                                             null,
-                                            true );
+                                            true,
+                                            buildContext );
         final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
         from.addTupleSink( sink );
 
@@ -191,7 +198,8 @@
                                             null,
                                             new AlphaNodeFieldConstraint[0],
                                             betaConstraints,
-                                            true );
+                                            true,
+                                            buildContext );
         final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
         from.addTupleSink( sink );
 
@@ -286,7 +294,8 @@
                                             null,
                                             new AlphaNodeFieldConstraint[]{constraint},
                                             null,
-                                            true );
+                                            true,
+                                            buildContext );
         final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
         from.addTupleSink( sink );
 

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
 import org.drools.common.BaseNode;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.NodeMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockLeftTupleSink extends LeftTupleSource
@@ -43,11 +44,11 @@
     private LeftTupleSinkNode     nextTupleSinkNode;
 
     public MockLeftTupleSink() {
-        super( 0 );
+        super( 0, RuleBasePartitionId.MAIN_PARTITION, false );
     }
 
     public MockLeftTupleSink(final int id) {
-        super( id );
+        super( id, RuleBasePartitionId.MAIN_PARTITION, false );
     }
 
     public void assertLeftTuple(final LeftTuple tuple,

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,9 +18,13 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
 
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockObjectSink
@@ -104,4 +108,16 @@
         // TODO Auto-generated method stub
         return 0;
     }
+
+    public RuleBasePartitionId getPartitionId() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 }
\ No newline at end of file

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -19,6 +19,7 @@
 import org.drools.common.BaseNode;
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 import java.io.IOException;
@@ -44,7 +45,7 @@
     }
 
     public MockObjectSource(final int id) {
-        super( id );
+        super( id, RuleBasePartitionId.MAIN_PARTITION, false);
         this.facts = new ArrayList();
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -2,8 +2,12 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
 
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockRightTupleSink
@@ -25,6 +29,17 @@
 
     public int getId() {
         return 0;
-    }    
+    }
 
+    public RuleBasePartitionId getPartitionId() {
+        return null;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 }

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,6 +18,7 @@
 
 import org.drools.common.BaseNode;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 public class MockTupleSource extends LeftTupleSource {
@@ -32,7 +33,7 @@
     private int               updated;
 
     public MockTupleSource(final int id) {
-        super( id );
+        super( id, RuleBasePartitionId.MAIN_PARTITION, false );
     }
 
     public void attach() {

Added: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import java.util.concurrent.TimeUnit;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+import junit.framework.TestCase;
+
+import org.drools.RuleBase;
+import org.drools.RuleBaseFactory;
+import org.drools.common.InternalWorkingMemory;
+
+/**
+ * Test case for PartitionTaskManager
+ *
+ * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class PartitionTaskManagerTest extends TestCase {
+    private MockAction action;
+    private PartitionTaskManager manager;
+
+    @Override
+    public void setUp() {
+        RuleBase rulebase = RuleBaseFactory.newRuleBase();
+        InternalWorkingMemory workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
+        action = new MockAction();
+        manager = new PartitionTaskManager( workingMemory );
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+
+    }
+
+    public void testStartStopService() throws InterruptedException {
+        assertFalse( manager.isRunning() );
+        manager.startService();
+        Thread.sleep( 1000 );
+        assertTrue( manager.isRunning() );
+        manager.stopService();
+        Thread.sleep( 1000 );
+        assertFalse( manager.isRunning() );
+    }
+
+    public void testNodeCallbacks() throws InterruptedException {
+        // should be possible to enqueue before starting the service,
+        // even if that should never happen
+        manager.enqueue( action );
+        manager.startService();
+        manager.enqueue( action );
+        // give the engine some time
+        Thread.sleep( 1000 ); 
+        assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
+        assertEquals( 2, action.getCallbackCounter() );
+        // should be possible to enqueue after the stop,
+        // but callback must not be executed
+        manager.enqueue( action );
+        manager.enqueue( action );
+        manager.enqueue( action );
+        // making sure the service is not processing the nodes
+        Thread.sleep( 1000 );
+        assertEquals( 2, action.getCallbackCounter() );
+        // restarting service
+        manager.startService();
+        // making sure the service had time to process the nodes
+        Thread.sleep( 1000 );
+        assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
+        assertEquals( 5, action.getCallbackCounter() );
+    }
+
+    public static class MockAction implements PartitionTaskManager.Action {
+        private volatile long callbackCounter = 0;
+
+        public synchronized long getCallbackCounter() {
+            return this.callbackCounter;
+        }
+
+        public void execute( InternalWorkingMemory workingMemory ) {
+            synchronized( this ) {
+                callbackCounter++;
+            }
+        }
+
+        public void writeExternal( ObjectOutput out ) throws IOException {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+            //To change body of implemented methods use File | Settings | File Templates.
+        }
+    }
+}

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java	2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java	2008-08-24 16:00:53 UTC (rev 21843)
@@ -136,7 +136,8 @@
         final QueryTerminalNode queryNode = new QueryTerminalNode( this.buildContext.getNextId(),
                                                                    joinNode,
                                                                    query,
-                                                                   query.getLhs() );
+                                                                   query.getLhs(),
+                                                                   buildContext );
 
         queryNode.attach();
 




More information about the jboss-svn-commits mailing list