[jboss-svn-commits] JBL Code SVN: r21857 - labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Aug 24 16:28:28 EDT 2008


Author: tirelli
Date: 2008-08-24 16:28:26 -0400 (Sun, 24 Aug 2008)
New Revision: 21857

Added:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java
Modified:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java
Log:
Adding the LeftTuple adapters for asynchronous propagation

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * An abstract super class for the LeftTupleSinkAdapters
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public abstract class AbstractLeftTupleSinkAdapter 
+    implements
+    LeftTupleSinkPropagator {
+
+    protected RuleBasePartitionId partitionId;
+
+    protected AbstractLeftTupleSinkAdapter( RuleBasePartitionId partitionId ) {
+        this.partitionId = partitionId;
+    }
+
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        out.writeObject( this.partitionId );
+    }
+
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        this.partitionId = (RuleBasePartitionId) in.readObject();
+    }
+
+    /**
+     * Returns the partition to which this propagator belongs to
+     *
+     * @return the ID of the partition
+     */
+    public RuleBasePartitionId getPartitionId() {
+        return this.partitionId;
+    }
+
+    /**
+     * Sets the partition to which this propagator belongs to
+     *
+     * @param partitionId
+     */
+    public void setPartitionId( RuleBasePartitionId partitionId ) {
+        this.partitionId = partitionId;
+    }
+
+}

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncCompositeLeftTupleSinkAdapter extends CompositeLeftTupleSinkAdapter {
+
+    public AsyncCompositeLeftTupleSinkAdapter() {
+    }
+
+    public AsyncCompositeLeftTupleSinkAdapter( RuleBasePartitionId partitionId ) {
+        super( partitionId );
+    }
+
+    protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                               LeftTupleSinkNode sink, LeftTuple leftTuple ) {
+        // composite propagators need to check each node to decide if the propagation
+        // must be asynchronous or may eventually be synchronous
+        if( this.partitionId.equals( sink.getPartitionId() )) {
+            // same partition, so synchronous propagation is fine
+            sink.assertLeftTuple( leftTuple, context, workingMemory );
+        } else {
+            // different partition, so use asynchronous propagation
+            PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+            manager.enqueue( new PartitionTaskManager.LeftTupleAssertAction( leftTuple, context, sink ) );
+        }
+    }
+
+    protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                                LeftTuple leftTuple, LeftTupleSink sink ) {
+        // composite propagators need to check each node to decide if the propagation
+        // must be asynchronous or may eventually be synchronous
+        if( this.partitionId.equals( sink.getPartitionId() )) {
+            // same partition, so synchronous propagation is fine
+            sink.retractLeftTuple( leftTuple, context, workingMemory );
+        } else {
+            // different partition, so use asynchronous propagation
+            PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+            manager.enqueue( new PartitionTaskManager.LeftTupleRetractAction( leftTuple, context, sink ) );
+        }
+    }
+}

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncSingleLeftTupleSinkAdapter extends SingleLeftTupleSinkAdapter {
+
+    public AsyncSingleLeftTupleSinkAdapter() {
+    }
+
+    public AsyncSingleLeftTupleSinkAdapter( RuleBasePartitionId partitionId, LeftTupleSink tupleSink ) {
+        super( partitionId, tupleSink );
+    }
+
+
+    protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                               LeftTuple leftTuple ) {
+        PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+        manager.enqueue( new PartitionTaskManager.LeftTupleAssertAction( leftTuple, context, this.sink ) );
+    }
+
+    protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                                LeftTuple leftTuple, LeftTupleSink tupleSink ) {
+        PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+        manager.enqueue( new PartitionTaskManager.LeftTupleRetractAction( leftTuple, context, tupleSink ) );
+    }
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -1,98 +1,83 @@
 package org.drools.reteoo;
 
+import org.drools.common.BaseNode;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
+import org.drools.spi.PropagationContext;
+
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
-import org.drools.spi.PropagationContext;
-
-public class CompositeLeftTupleSinkAdapter
-    implements
-    LeftTupleSinkPropagator {
+public class CompositeLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
     private LeftTupleSinkNodeList sinks;
 
     public CompositeLeftTupleSinkAdapter() {
+        super( RuleBasePartitionId.MAIN_PARTITION );
+    }
+
+    public CompositeLeftTupleSinkAdapter( final RuleBasePartitionId partitionId ) {
+        super( partitionId );
         this.sinks = new LeftTupleSinkNodeList();
     }
 
-    public void addTupleSink(final LeftTupleSink sink) {
+    public void addTupleSink( final LeftTupleSink sink ) {
         this.sinks.add( (LeftTupleSinkNode) sink );
     }
 
-    public void removeTupleSink(final LeftTupleSink sink) {
+    public void removeTupleSink( final LeftTupleSink sink ) {
         this.sinks.remove( (LeftTupleSinkNode) sink );
     }
 
-    public void propagateAssertLeftTuple(final LeftTuple leftTuple,
-                                         final RightTuple rightTuple,
-                                         final PropagationContext context,
-                                         final InternalWorkingMemory workingMemory,
-                                         final boolean leftTupleMemoryEnabled) {
+    public void propagateAssertLeftTuple( final LeftTuple leftTuple, final RightTuple rightTuple,
+                                          final PropagationContext context, final InternalWorkingMemory workingMemory,
+                                          final boolean leftTupleMemoryEnabled ) {
 
-        for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
-            sink.assertLeftTuple( new LeftTuple( leftTuple,
-                                                 rightTuple,
-                                                 sink,
-                                                 leftTupleMemoryEnabled ),
-                                  context,
-                                  workingMemory );
+        for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+            LeftTuple newLeftTuple = new LeftTuple( leftTuple, rightTuple, sink, leftTupleMemoryEnabled );
+            doPropagateAssertLeftTuple( context, workingMemory, sink, newLeftTuple );
         }
     }
 
-    public void propagateAssertLeftTuple(final LeftTuple tuple,
-                                         final PropagationContext context,
-                                         final InternalWorkingMemory workingMemory,
-                                         final boolean leftTupleMemoryEnabled) {
-        for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
-            sink.assertLeftTuple( new LeftTuple( tuple,
-                                                 sink,
-                                                 leftTupleMemoryEnabled ),
-                                  context,
-                                  workingMemory );
+    public void propagateAssertLeftTuple( final LeftTuple tuple, final PropagationContext context,
+                                          final InternalWorkingMemory workingMemory,
+                                          final boolean leftTupleMemoryEnabled ) {
+        for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+            doPropagateAssertLeftTuple( context, workingMemory, sink,
+                                        new LeftTuple( tuple, sink, leftTupleMemoryEnabled ) );
         }
     }
 
-    public void createAndPropagateAssertLeftTuple(final InternalFactHandle factHandle,
-                                                  final PropagationContext context,
-                                                  final InternalWorkingMemory workingMemory,
-                                                  final boolean leftTupleMemoryEnabled) {
-        for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
-            sink.assertLeftTuple( new LeftTuple( factHandle,
-                                                 sink,
-                                                 leftTupleMemoryEnabled),
-                                  context,
-                                  workingMemory );
+    public void createAndPropagateAssertLeftTuple( final InternalFactHandle factHandle,
+                                                   final PropagationContext context,
+                                                   final InternalWorkingMemory workingMemory,
+                                                   final boolean leftTupleMemoryEnabled ) {
+        for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+            doPropagateAssertLeftTuple( context, workingMemory, sink,
+                                        new LeftTuple( factHandle, sink, leftTupleMemoryEnabled ) );
         }
     }
 
 
-    public void propagateRetractLeftTuple(final LeftTuple leftTuple,
-                                          final PropagationContext context,
-                                          final InternalWorkingMemory workingMemory) {
+    public void propagateRetractLeftTuple( final LeftTuple leftTuple, final PropagationContext context,
+                                           final InternalWorkingMemory workingMemory ) {
         LeftTuple child = leftTuple.getBetaChildren();
-        while ( child != null ) {
+        while( child != null ) {
             LeftTuple temp = child.getLeftParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             child.unlinkFromRightParent();
             child.unlinkFromLeftParent();
             child = temp;
         }
     }
 
-    public void propagateRetractLeftTupleDestroyRightTuple(final LeftTuple leftTuple,
-                                                           final PropagationContext context,
-                                                           final InternalWorkingMemory workingMemory) {
+    public void propagateRetractLeftTupleDestroyRightTuple( final LeftTuple leftTuple, final PropagationContext context,
+                                                            final InternalWorkingMemory workingMemory ) {
         LeftTuple child = leftTuple.getBetaChildren();
-        while ( child != null ) {
+        while( child != null ) {
             LeftTuple temp = child.getLeftParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             workingMemory.getFactHandleFactory().destroyFactHandle( child.getRightParent().getFactHandle() );
             child.unlinkFromRightParent();
             child.unlinkFromLeftParent();
@@ -100,25 +85,22 @@
         }
     }
 
-    public void propagateRetractRightTuple(final RightTuple rightTuple,
-                                           final PropagationContext context,
-                                           final InternalWorkingMemory workingMemory) {
+    public void propagateRetractRightTuple( final RightTuple rightTuple, final PropagationContext context,
+                                            final InternalWorkingMemory workingMemory ) {
         LeftTuple child = rightTuple.getBetaChildren();
-        while ( child != null ) {
+        while( child != null ) {
             LeftTuple temp = child.getRightParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             child.unlinkFromLeftParent();
             child.unlinkFromRightParent();
             child = temp;
         }
     }
 
-    public BaseNode getMatchingNode(BaseNode candidate) {
-        for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
-            if (candidate.equals(sink)) {
-                return (BaseNode)sink;
+    public BaseNode getMatchingNode( BaseNode candidate ) {
+        for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+            if( candidate.equals( sink ) ) {
+                return (BaseNode) sink;
             }
         }
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -128,7 +110,7 @@
         final LeftTupleSink[] sinkArray = new LeftTupleSink[this.sinks.size()];
 
         int i = 0;
-        for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+        for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
             sinkArray[i++] = sink;
         }
 
@@ -139,12 +121,43 @@
         return this.sinks.size();
     }
 
-    public void readExternal(ObjectInput in) throws IOException,
-                                            ClassNotFoundException {
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        super.readExternal( in );
         this.sinks = (LeftTupleSinkNodeList) in.readObject();
     }
 
-    public void writeExternal(ObjectOutput out) throws IOException {
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        super.writeExternal( out );
         out.writeObject( this.sinks );
     }
+
+    /**
+     * This is a hook method that may be overriden by subclasses. Please keep it
+     * protected.
+     *
+     * @param context
+     * @param workingMemory
+     * @param sink
+     * @param leftTuple
+     */
+    protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                               LeftTupleSinkNode sink, LeftTuple leftTuple ) {
+        sink.assertLeftTuple( leftTuple, context, workingMemory );
+    }
+
+    /**
+     * This is a hook method that may be overriden by subclasses. Please keep it
+     * protected.
+     *
+     * @param context
+     * @param workingMemory
+     * @param leftTuple
+     * @param sink
+     */
+    protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                                LeftTuple leftTuple, LeftTupleSink sink ) {
+        sink.retractLeftTuple( leftTuple, context, workingMemory );
+    }
+
+
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -3,15 +3,14 @@
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
 import java.io.ObjectOutput;
 import java.io.IOException;
 import java.io.ObjectInput;
 
-public class EmptyLeftTupleSinkAdapter
-    implements
-    LeftTupleSinkPropagator {
+public class EmptyLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
 
     private static final EmptyLeftTupleSinkAdapter instance = new EmptyLeftTupleSinkAdapter();
 
@@ -20,6 +19,7 @@
     }
 
     public EmptyLeftTupleSinkAdapter() {
+        super( RuleBasePartitionId.MAIN_PARTITION );
         // constructor needed for serialisation
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -23,7 +23,7 @@
     }
 
     public EmptyObjectSinkAdapter() {
-        super( null );
+        super( RuleBasePartitionId.MAIN_PARTITION );
     }
 
     public EmptyObjectSinkAdapter( RuleBasePartitionId partitionId ) {

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -91,9 +91,25 @@
      */
     protected void addTupleSink(final LeftTupleSink tupleSink) {
         if ( this.sink instanceof EmptyLeftTupleSinkAdapter ) {
-            this.sink = new SingleLeftTupleSinkAdapter( tupleSink );
+            if( this.partitionsEnabled && ! this.partitionId.equals( tupleSink.getPartitionId() ) ) {
+                // if partitions are enabled and the next node belongs to a different partition,
+                // we need to use the asynchronous propagator
+                this.sink = new AsyncSingleLeftTupleSinkAdapter( this.getPartitionId(), tupleSink );
+            } else {
+                // otherwise, we use the lighter synchronous propagator
+                this.sink = new SingleLeftTupleSinkAdapter( this.getPartitionId(), tupleSink );
+            }
         } else if ( this.sink instanceof SingleLeftTupleSinkAdapter ) {
-            final CompositeLeftTupleSinkAdapter sinkAdapter = new CompositeLeftTupleSinkAdapter();
+            final CompositeLeftTupleSinkAdapter sinkAdapter;
+            if( this.partitionsEnabled ) {
+                // a composite propagator may propagate to both nodes in the same partition
+                // as well as in a different partition, so, if partitions are enabled, we
+                // must use the asynchronous version
+                sinkAdapter = new AsyncCompositeLeftTupleSinkAdapter( this.getPartitionId() );
+            } else {
+                // if partitions are disabled, then it is safe to use the lighter synchronous propagator
+                sinkAdapter = new CompositeLeftTupleSinkAdapter( this.getPartitionId() );
+            }
             sinkAdapter.addTupleSink( this.sink.getSinks()[0] );
             sinkAdapter.addTupleSink( tupleSink );
             this.sink = sinkAdapter;
@@ -119,7 +135,14 @@
             final CompositeLeftTupleSinkAdapter sinkAdapter = (CompositeLeftTupleSinkAdapter) this.sink;
             sinkAdapter.removeTupleSink( tupleSink );
             if ( sinkAdapter.size() == 1 ) {
-                this.sink = new SingleLeftTupleSinkAdapter( sinkAdapter.getSinks()[0] );
+                if( this.partitionsEnabled && ! this.partitionId.equals( tupleSink.getPartitionId() ) ) {
+                    // if partitions are enabled and the next node belongs to a different partition,
+                    // we need to use the asynchronous propagator
+                    this.sink = new AsyncSingleLeftTupleSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+                } else {
+                    // otherwise, we use the lighter synchronous propagator
+                    this.sink = new SingleLeftTupleSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+                }
             }
         }
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -312,7 +312,7 @@
         public abstract void execute( final InternalWorkingMemory workingMemory );
     }
 
-    static class FactAssertAction extends FactAction {
+    public static class FactAssertAction extends FactAction {
         private static final long serialVersionUID = -8478488926430845209L;
 
         FactAssertAction() {
@@ -328,4 +328,67 @@
         }
     }
 
+    /**
+     * An abstract super class for all leftTuple-related actions
+     */
+    public static abstract class LeftTupleAction implements Action, Externalizable {
+
+        protected LeftTuple          leftTuple;
+        protected PropagationContext context;
+        protected LeftTupleSink      sink;
+
+        public LeftTupleAction() {
+        }
+
+        public LeftTupleAction( final LeftTuple leftTuple, final PropagationContext context,
+                           final LeftTupleSink sink ) {
+            super();
+            this.leftTuple = leftTuple;
+            this.context = context;
+            this.sink = sink;
+        }
+
+        public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+            leftTuple = (LeftTuple) in.readObject();
+            context = (PropagationContext) in.readObject();
+            sink = (LeftTupleSink) in.readObject();
+        }
+
+        public void writeExternal( ObjectOutput out ) throws IOException {
+            out.writeObject( leftTuple );
+            out.writeObject( context );
+            out.writeObject( sink );
+        }
+
+        public abstract void execute( final InternalWorkingMemory workingMemory );
+    }
+
+    public static class LeftTupleAssertAction extends LeftTupleAction {
+
+        public LeftTupleAssertAction() {
+        }
+        
+        public LeftTupleAssertAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
+            super(leftTuple, context, sink );
+        }
+
+        public void execute( InternalWorkingMemory workingMemory ) {
+            this.sink.assertLeftTuple( leftTuple, context, workingMemory );
+        }
+    }
+
+
+    public static class LeftTupleRetractAction extends LeftTupleAction {
+        
+        public LeftTupleRetractAction() {
+        }
+
+        public LeftTupleRetractAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
+            super(leftTuple, context, sink );
+        }
+
+        public void execute( InternalWorkingMemory workingMemory ) {
+            this.sink.assertLeftTuple( leftTuple, context, workingMemory );
+        }
+    }
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java	2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java	2008-08-24 20:28:26 UTC (rev 21857)
@@ -1,75 +1,59 @@
 package org.drools.reteoo;
 
+import org.drools.common.BaseNode;
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
 import org.drools.spi.PropagationContext;
 
+import java.io.IOException;
 import java.io.ObjectInput;
-import java.io.IOException;
 import java.io.ObjectOutput;
 
-public class SingleLeftTupleSinkAdapter
-    implements
-    LeftTupleSinkPropagator {
-    private LeftTupleSink sink;
+public class SingleLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
+    protected LeftTupleSink sink;
 
     public SingleLeftTupleSinkAdapter() {
-
+        this( RuleBasePartitionId.MAIN_PARTITION, null );
     }
 
-    public SingleLeftTupleSinkAdapter(final LeftTupleSink sink) {
+    public SingleLeftTupleSinkAdapter( final RuleBasePartitionId partitionId, final LeftTupleSink sink ) {
+        super( partitionId );
         this.sink = sink;
     }
 
-    public void propagateAssertLeftTuple(final LeftTuple leftTuple,
-                                         final RightTuple rightTuple,
-                                         final PropagationContext context,
-                                         final InternalWorkingMemory workingMemory,
-                                         boolean leftTupleMemoryEnabled) {
-        this.sink.assertLeftTuple( new LeftTuple( leftTuple,
-                                                  rightTuple,
-                                                  this.sink,
-                                                  leftTupleMemoryEnabled ),
-                                   context,
-                                   workingMemory );
+    public void propagateAssertLeftTuple( final LeftTuple leftTuple, final RightTuple rightTuple,
+                                          final PropagationContext context, final InternalWorkingMemory workingMemory,
+                                          boolean leftTupleMemoryEnabled ) {
+        doPropagateAssertLeftTuple( context, workingMemory,
+                                    new LeftTuple( leftTuple, rightTuple, this.sink, leftTupleMemoryEnabled ) );
     }
 
-    public void propagateAssertLeftTuple(final LeftTuple tuple,
-                                         final PropagationContext context,
-                                         final InternalWorkingMemory workingMemory,
-                                         boolean leftTupleMemoryEnabled) {
-        this.sink.assertLeftTuple( new LeftTuple( tuple,
-                                                  this.sink,
-                                                  leftTupleMemoryEnabled ),
-                                   context,
-                                   workingMemory );
+    public void propagateAssertLeftTuple( final LeftTuple tuple, final PropagationContext context,
+                                          final InternalWorkingMemory workingMemory, boolean leftTupleMemoryEnabled ) {
+        doPropagateAssertLeftTuple( context, workingMemory, new LeftTuple( tuple, this.sink, leftTupleMemoryEnabled ) );
     }
 
-    public void propagateRetractLeftTuple(final LeftTuple leftTuple,
-                                          final PropagationContext context,
-                                          final InternalWorkingMemory workingMemory) {
+    public void propagateRetractLeftTuple( final LeftTuple leftTuple, final PropagationContext context,
+                                           final InternalWorkingMemory workingMemory ) {
         LeftTuple child = leftTuple.getBetaChildren();
-        while ( child != null ) {
+        // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+        while( child != null ) {
             LeftTuple temp = child.getLeftParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             child.unlinkFromRightParent();
             child.unlinkFromLeftParent();
             child = temp;
         }
     }
 
-    public void propagateRetractLeftTupleDestroyRightTuple(final LeftTuple leftTuple,
-                                          final PropagationContext context,
-                                          final InternalWorkingMemory workingMemory) {
+    public void propagateRetractLeftTupleDestroyRightTuple( final LeftTuple leftTuple, final PropagationContext context,
+                                                            final InternalWorkingMemory workingMemory ) {
         LeftTuple child = leftTuple.getBetaChildren();
-        while ( child != null ) {
+        // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+        while( child != null ) {
             LeftTuple temp = child.getLeftParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             workingMemory.getFactHandleFactory().destroyFactHandle( child.getRightParent().getFactHandle() );
             child.unlinkFromRightParent();
             child.unlinkFromLeftParent();
@@ -77,35 +61,30 @@
         }
     }
 
-    public void propagateRetractRightTuple(final RightTuple rightTuple,
-                                           final PropagationContext context,
-                                           final InternalWorkingMemory workingMemory) {
+    public void propagateRetractRightTuple( final RightTuple rightTuple, final PropagationContext context,
+                                            final InternalWorkingMemory workingMemory ) {
         LeftTuple child = rightTuple.getBetaChildren();
-        while ( child != null ) {
+        // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+        while( child != null ) {
             LeftTuple temp = child.getRightParentNext();
-            child.getLeftTupleSink().retractLeftTuple( child,
-                                              context,
-                                              workingMemory );
+            doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
             child.unlinkFromLeftParent();
             child.unlinkFromRightParent();
             child = temp;
         }
     }
 
-    public void createAndPropagateAssertLeftTuple(final InternalFactHandle factHandle,
-                                                  final PropagationContext context,
-                                                  final InternalWorkingMemory workingMemory,
-                                                  boolean leftTupleMemoryEnabled) {
-        this.sink.assertLeftTuple( new LeftTuple( factHandle,
-                                                  this.sink,
-                                                  leftTupleMemoryEnabled ),
-                                   context,
-                                   workingMemory );
+    public void createAndPropagateAssertLeftTuple( final InternalFactHandle factHandle,
+                                                   final PropagationContext context,
+                                                   final InternalWorkingMemory workingMemory,
+                                                   boolean leftTupleMemoryEnabled ) {
+        doPropagateAssertLeftTuple( context, workingMemory,
+                                    new LeftTuple( factHandle, this.sink, leftTupleMemoryEnabled ) );
     }
 
-    public BaseNode getMatchingNode(BaseNode candidate) {
-        if (candidate.equals(sink)) {
-            return (BaseNode)sink;
+    public BaseNode getMatchingNode( BaseNode candidate ) {
+        if( candidate.equals( sink ) ) {
+            return (BaseNode) sink;
         }
         return null;
     }
@@ -118,12 +97,41 @@
         return (this.sink != null) ? 1 : 0;
     }
 
-    public void readExternal(ObjectInput in) throws IOException,
-                                            ClassNotFoundException {
-        this.sink = ( LeftTupleSink) in.readObject();
+    public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+        super.readExternal( in );
+        this.sink = (LeftTupleSink) in.readObject();
     }
 
-    public void writeExternal(ObjectOutput out) throws IOException {
+    public void writeExternal( ObjectOutput out ) throws IOException {
+        super.writeExternal( out );
         out.writeObject( this.sink );
     }
+
+    /**
+     * This is a hook method that may be overriden by subclasses. Please keep it
+     * package protected.
+     *
+     * @param context
+     * @param workingMemory
+     * @param newLeftTuple
+     */
+    protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                               LeftTuple newLeftTuple ) {
+        this.sink.assertLeftTuple( newLeftTuple, context, workingMemory );
+    }
+
+    /**
+     * This is a hook method that may be overriden by subclasses. Please keep it
+     * package protected.
+     *
+     * @param context
+     * @param workingMemory
+     * @param child
+     * @param tupleSink
+     */
+    protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+                                                LeftTuple child, LeftTupleSink tupleSink ) {
+        tupleSink.retractLeftTuple( child, context, workingMemory );
+    }
+
 }




More information about the jboss-svn-commits mailing list