[jboss-svn-commits] JBL Code SVN: r21857 - labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Aug 24 16:28:28 EDT 2008
Author: tirelli
Date: 2008-08-24 16:28:26 -0400 (Sun, 24 Aug 2008)
New Revision: 21857
Added:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java
Modified:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java
Log:
Adding the LeftTuple adapters for asynchronous propagation
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * An abstract super class for the LeftTupleSinkAdapters
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public abstract class AbstractLeftTupleSinkAdapter
+ implements
+ LeftTupleSinkPropagator {
+
+ protected RuleBasePartitionId partitionId;
+
+ protected AbstractLeftTupleSinkAdapter( RuleBasePartitionId partitionId ) {
+ this.partitionId = partitionId;
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ out.writeObject( this.partitionId );
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ this.partitionId = (RuleBasePartitionId) in.readObject();
+ }
+
+ /**
+ * Returns the partition to which this propagator belongs to
+ *
+ * @return the ID of the partition
+ */
+ public RuleBasePartitionId getPartitionId() {
+ return this.partitionId;
+ }
+
+ /**
+ * Sets the partition to which this propagator belongs to
+ *
+ * @param partitionId
+ */
+ public void setPartitionId( RuleBasePartitionId partitionId ) {
+ this.partitionId = partitionId;
+ }
+
+}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncCompositeLeftTupleSinkAdapter extends CompositeLeftTupleSinkAdapter {
+
+ public AsyncCompositeLeftTupleSinkAdapter() {
+ }
+
+ public AsyncCompositeLeftTupleSinkAdapter( RuleBasePartitionId partitionId ) {
+ super( partitionId );
+ }
+
+ protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTupleSinkNode sink, LeftTuple leftTuple ) {
+ // composite propagators need to check each node to decide if the propagation
+ // must be asynchronous or may eventually be synchronous
+ if( this.partitionId.equals( sink.getPartitionId() )) {
+ // same partition, so synchronous propagation is fine
+ sink.assertLeftTuple( leftTuple, context, workingMemory );
+ } else {
+ // different partition, so use asynchronous propagation
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.LeftTupleAssertAction( leftTuple, context, sink ) );
+ }
+ }
+
+ protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple leftTuple, LeftTupleSink sink ) {
+ // composite propagators need to check each node to decide if the propagation
+ // must be asynchronous or may eventually be synchronous
+ if( this.partitionId.equals( sink.getPartitionId() )) {
+ // same partition, so synchronous propagation is fine
+ sink.retractLeftTuple( leftTuple, context, workingMemory );
+ } else {
+ // different partition, so use asynchronous propagation
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.LeftTupleRetractAction( leftTuple, context, sink ) );
+ }
+ }
+}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncSingleLeftTupleSinkAdapter extends SingleLeftTupleSinkAdapter {
+
+ public AsyncSingleLeftTupleSinkAdapter() {
+ }
+
+ public AsyncSingleLeftTupleSinkAdapter( RuleBasePartitionId partitionId, LeftTupleSink tupleSink ) {
+ super( partitionId, tupleSink );
+ }
+
+
+ protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple leftTuple ) {
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.LeftTupleAssertAction( leftTuple, context, this.sink ) );
+ }
+
+ protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple leftTuple, LeftTupleSink tupleSink ) {
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.LeftTupleRetractAction( leftTuple, context, tupleSink ) );
+ }
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -1,98 +1,83 @@
package org.drools.reteoo;
+import org.drools.common.BaseNode;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
+import org.drools.spi.PropagationContext;
+
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
-import org.drools.spi.PropagationContext;
-
-public class CompositeLeftTupleSinkAdapter
- implements
- LeftTupleSinkPropagator {
+public class CompositeLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
private LeftTupleSinkNodeList sinks;
public CompositeLeftTupleSinkAdapter() {
+ super( RuleBasePartitionId.MAIN_PARTITION );
+ }
+
+ public CompositeLeftTupleSinkAdapter( final RuleBasePartitionId partitionId ) {
+ super( partitionId );
this.sinks = new LeftTupleSinkNodeList();
}
- public void addTupleSink(final LeftTupleSink sink) {
+ public void addTupleSink( final LeftTupleSink sink ) {
this.sinks.add( (LeftTupleSinkNode) sink );
}
- public void removeTupleSink(final LeftTupleSink sink) {
+ public void removeTupleSink( final LeftTupleSink sink ) {
this.sinks.remove( (LeftTupleSinkNode) sink );
}
- public void propagateAssertLeftTuple(final LeftTuple leftTuple,
- final RightTuple rightTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- final boolean leftTupleMemoryEnabled) {
+ public void propagateAssertLeftTuple( final LeftTuple leftTuple, final RightTuple rightTuple,
+ final PropagationContext context, final InternalWorkingMemory workingMemory,
+ final boolean leftTupleMemoryEnabled ) {
- for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
- sink.assertLeftTuple( new LeftTuple( leftTuple,
- rightTuple,
- sink,
- leftTupleMemoryEnabled ),
- context,
- workingMemory );
+ for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+ LeftTuple newLeftTuple = new LeftTuple( leftTuple, rightTuple, sink, leftTupleMemoryEnabled );
+ doPropagateAssertLeftTuple( context, workingMemory, sink, newLeftTuple );
}
}
- public void propagateAssertLeftTuple(final LeftTuple tuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- final boolean leftTupleMemoryEnabled) {
- for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
- sink.assertLeftTuple( new LeftTuple( tuple,
- sink,
- leftTupleMemoryEnabled ),
- context,
- workingMemory );
+ public void propagateAssertLeftTuple( final LeftTuple tuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory,
+ final boolean leftTupleMemoryEnabled ) {
+ for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+ doPropagateAssertLeftTuple( context, workingMemory, sink,
+ new LeftTuple( tuple, sink, leftTupleMemoryEnabled ) );
}
}
- public void createAndPropagateAssertLeftTuple(final InternalFactHandle factHandle,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- final boolean leftTupleMemoryEnabled) {
- for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
- sink.assertLeftTuple( new LeftTuple( factHandle,
- sink,
- leftTupleMemoryEnabled),
- context,
- workingMemory );
+ public void createAndPropagateAssertLeftTuple( final InternalFactHandle factHandle,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory,
+ final boolean leftTupleMemoryEnabled ) {
+ for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+ doPropagateAssertLeftTuple( context, workingMemory, sink,
+ new LeftTuple( factHandle, sink, leftTupleMemoryEnabled ) );
}
}
- public void propagateRetractLeftTuple(final LeftTuple leftTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractLeftTuple( final LeftTuple leftTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = leftTuple.getBetaChildren();
- while ( child != null ) {
+ while( child != null ) {
LeftTuple temp = child.getLeftParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
child.unlinkFromRightParent();
child.unlinkFromLeftParent();
child = temp;
}
}
- public void propagateRetractLeftTupleDestroyRightTuple(final LeftTuple leftTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractLeftTupleDestroyRightTuple( final LeftTuple leftTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = leftTuple.getBetaChildren();
- while ( child != null ) {
+ while( child != null ) {
LeftTuple temp = child.getLeftParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
workingMemory.getFactHandleFactory().destroyFactHandle( child.getRightParent().getFactHandle() );
child.unlinkFromRightParent();
child.unlinkFromLeftParent();
@@ -100,25 +85,22 @@
}
}
- public void propagateRetractRightTuple(final RightTuple rightTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractRightTuple( final RightTuple rightTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = rightTuple.getBetaChildren();
- while ( child != null ) {
+ while( child != null ) {
LeftTuple temp = child.getRightParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
child.unlinkFromLeftParent();
child.unlinkFromRightParent();
child = temp;
}
}
- public BaseNode getMatchingNode(BaseNode candidate) {
- for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
- if (candidate.equals(sink)) {
- return (BaseNode)sink;
+ public BaseNode getMatchingNode( BaseNode candidate ) {
+ for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+ if( candidate.equals( sink ) ) {
+ return (BaseNode) sink;
}
}
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -128,7 +110,7 @@
final LeftTupleSink[] sinkArray = new LeftTupleSink[this.sinks.size()];
int i = 0;
- for ( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
+ for( LeftTupleSinkNode sink = this.sinks.getFirst(); sink != null; sink = sink.getNextLeftTupleSinkNode() ) {
sinkArray[i++] = sink;
}
@@ -139,12 +121,43 @@
return this.sinks.size();
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ super.readExternal( in );
this.sinks = (LeftTupleSinkNodeList) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ super.writeExternal( out );
out.writeObject( this.sinks );
}
+
+ /**
+ * This is a hook method that may be overriden by subclasses. Please keep it
+ * protected.
+ *
+ * @param context
+ * @param workingMemory
+ * @param sink
+ * @param leftTuple
+ */
+ protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTupleSinkNode sink, LeftTuple leftTuple ) {
+ sink.assertLeftTuple( leftTuple, context, workingMemory );
+ }
+
+ /**
+ * This is a hook method that may be overriden by subclasses. Please keep it
+ * protected.
+ *
+ * @param context
+ * @param workingMemory
+ * @param leftTuple
+ * @param sink
+ */
+ protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple leftTuple, LeftTupleSink sink ) {
+ sink.retractLeftTuple( leftTuple, context, workingMemory );
+ }
+
+
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -3,15 +3,14 @@
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
import java.io.ObjectOutput;
import java.io.IOException;
import java.io.ObjectInput;
-public class EmptyLeftTupleSinkAdapter
- implements
- LeftTupleSinkPropagator {
+public class EmptyLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
private static final EmptyLeftTupleSinkAdapter instance = new EmptyLeftTupleSinkAdapter();
@@ -20,6 +19,7 @@
}
public EmptyLeftTupleSinkAdapter() {
+ super( RuleBasePartitionId.MAIN_PARTITION );
// constructor needed for serialisation
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -23,7 +23,7 @@
}
public EmptyObjectSinkAdapter() {
- super( null );
+ super( RuleBasePartitionId.MAIN_PARTITION );
}
public EmptyObjectSinkAdapter( RuleBasePartitionId partitionId ) {
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -91,9 +91,25 @@
*/
protected void addTupleSink(final LeftTupleSink tupleSink) {
if ( this.sink instanceof EmptyLeftTupleSinkAdapter ) {
- this.sink = new SingleLeftTupleSinkAdapter( tupleSink );
+ if( this.partitionsEnabled && ! this.partitionId.equals( tupleSink.getPartitionId() ) ) {
+ // if partitions are enabled and the next node belongs to a different partition,
+ // we need to use the asynchronous propagator
+ this.sink = new AsyncSingleLeftTupleSinkAdapter( this.getPartitionId(), tupleSink );
+ } else {
+ // otherwise, we use the lighter synchronous propagator
+ this.sink = new SingleLeftTupleSinkAdapter( this.getPartitionId(), tupleSink );
+ }
} else if ( this.sink instanceof SingleLeftTupleSinkAdapter ) {
- final CompositeLeftTupleSinkAdapter sinkAdapter = new CompositeLeftTupleSinkAdapter();
+ final CompositeLeftTupleSinkAdapter sinkAdapter;
+ if( this.partitionsEnabled ) {
+ // a composite propagator may propagate to both nodes in the same partition
+ // as well as in a different partition, so, if partitions are enabled, we
+ // must use the asynchronous version
+ sinkAdapter = new AsyncCompositeLeftTupleSinkAdapter( this.getPartitionId() );
+ } else {
+ // if partitions are disabled, then it is safe to use the lighter synchronous propagator
+ sinkAdapter = new CompositeLeftTupleSinkAdapter( this.getPartitionId() );
+ }
sinkAdapter.addTupleSink( this.sink.getSinks()[0] );
sinkAdapter.addTupleSink( tupleSink );
this.sink = sinkAdapter;
@@ -119,7 +135,14 @@
final CompositeLeftTupleSinkAdapter sinkAdapter = (CompositeLeftTupleSinkAdapter) this.sink;
sinkAdapter.removeTupleSink( tupleSink );
if ( sinkAdapter.size() == 1 ) {
- this.sink = new SingleLeftTupleSinkAdapter( sinkAdapter.getSinks()[0] );
+ if( this.partitionsEnabled && ! this.partitionId.equals( tupleSink.getPartitionId() ) ) {
+ // if partitions are enabled and the next node belongs to a different partition,
+ // we need to use the asynchronous propagator
+ this.sink = new AsyncSingleLeftTupleSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+ } else {
+ // otherwise, we use the lighter synchronous propagator
+ this.sink = new SingleLeftTupleSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+ }
}
}
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -312,7 +312,7 @@
public abstract void execute( final InternalWorkingMemory workingMemory );
}
- static class FactAssertAction extends FactAction {
+ public static class FactAssertAction extends FactAction {
private static final long serialVersionUID = -8478488926430845209L;
FactAssertAction() {
@@ -328,4 +328,67 @@
}
}
+ /**
+ * An abstract super class for all leftTuple-related actions
+ */
+ public static abstract class LeftTupleAction implements Action, Externalizable {
+
+ protected LeftTuple leftTuple;
+ protected PropagationContext context;
+ protected LeftTupleSink sink;
+
+ public LeftTupleAction() {
+ }
+
+ public LeftTupleAction( final LeftTuple leftTuple, final PropagationContext context,
+ final LeftTupleSink sink ) {
+ super();
+ this.leftTuple = leftTuple;
+ this.context = context;
+ this.sink = sink;
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ leftTuple = (LeftTuple) in.readObject();
+ context = (PropagationContext) in.readObject();
+ sink = (LeftTupleSink) in.readObject();
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ out.writeObject( leftTuple );
+ out.writeObject( context );
+ out.writeObject( sink );
+ }
+
+ public abstract void execute( final InternalWorkingMemory workingMemory );
+ }
+
+ public static class LeftTupleAssertAction extends LeftTupleAction {
+
+ public LeftTupleAssertAction() {
+ }
+
+ public LeftTupleAssertAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
+ super(leftTuple, context, sink );
+ }
+
+ public void execute( InternalWorkingMemory workingMemory ) {
+ this.sink.assertLeftTuple( leftTuple, context, workingMemory );
+ }
+ }
+
+
+ public static class LeftTupleRetractAction extends LeftTupleAction {
+
+ public LeftTupleRetractAction() {
+ }
+
+ public LeftTupleRetractAction( LeftTuple leftTuple, PropagationContext context, LeftTupleSink sink ) {
+ super(leftTuple, context, sink );
+ }
+
+ public void execute( InternalWorkingMemory workingMemory ) {
+ this.sink.assertLeftTuple( leftTuple, context, workingMemory );
+ }
+ }
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java 2008-08-24 20:25:48 UTC (rev 21856)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleLeftTupleSinkAdapter.java 2008-08-24 20:28:26 UTC (rev 21857)
@@ -1,75 +1,59 @@
package org.drools.reteoo;
+import org.drools.common.BaseNode;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
+import java.io.IOException;
import java.io.ObjectInput;
-import java.io.IOException;
import java.io.ObjectOutput;
-public class SingleLeftTupleSinkAdapter
- implements
- LeftTupleSinkPropagator {
- private LeftTupleSink sink;
+public class SingleLeftTupleSinkAdapter extends AbstractLeftTupleSinkAdapter {
+ protected LeftTupleSink sink;
public SingleLeftTupleSinkAdapter() {
-
+ this( RuleBasePartitionId.MAIN_PARTITION, null );
}
- public SingleLeftTupleSinkAdapter(final LeftTupleSink sink) {
+ public SingleLeftTupleSinkAdapter( final RuleBasePartitionId partitionId, final LeftTupleSink sink ) {
+ super( partitionId );
this.sink = sink;
}
- public void propagateAssertLeftTuple(final LeftTuple leftTuple,
- final RightTuple rightTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- boolean leftTupleMemoryEnabled) {
- this.sink.assertLeftTuple( new LeftTuple( leftTuple,
- rightTuple,
- this.sink,
- leftTupleMemoryEnabled ),
- context,
- workingMemory );
+ public void propagateAssertLeftTuple( final LeftTuple leftTuple, final RightTuple rightTuple,
+ final PropagationContext context, final InternalWorkingMemory workingMemory,
+ boolean leftTupleMemoryEnabled ) {
+ doPropagateAssertLeftTuple( context, workingMemory,
+ new LeftTuple( leftTuple, rightTuple, this.sink, leftTupleMemoryEnabled ) );
}
- public void propagateAssertLeftTuple(final LeftTuple tuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- boolean leftTupleMemoryEnabled) {
- this.sink.assertLeftTuple( new LeftTuple( tuple,
- this.sink,
- leftTupleMemoryEnabled ),
- context,
- workingMemory );
+ public void propagateAssertLeftTuple( final LeftTuple tuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory, boolean leftTupleMemoryEnabled ) {
+ doPropagateAssertLeftTuple( context, workingMemory, new LeftTuple( tuple, this.sink, leftTupleMemoryEnabled ) );
}
- public void propagateRetractLeftTuple(final LeftTuple leftTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractLeftTuple( final LeftTuple leftTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = leftTuple.getBetaChildren();
- while ( child != null ) {
+ // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+ while( child != null ) {
LeftTuple temp = child.getLeftParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
child.unlinkFromRightParent();
child.unlinkFromLeftParent();
child = temp;
}
}
- public void propagateRetractLeftTupleDestroyRightTuple(final LeftTuple leftTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractLeftTupleDestroyRightTuple( final LeftTuple leftTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = leftTuple.getBetaChildren();
- while ( child != null ) {
+ // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+ while( child != null ) {
LeftTuple temp = child.getLeftParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
workingMemory.getFactHandleFactory().destroyFactHandle( child.getRightParent().getFactHandle() );
child.unlinkFromRightParent();
child.unlinkFromLeftParent();
@@ -77,35 +61,30 @@
}
}
- public void propagateRetractRightTuple(final RightTuple rightTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ public void propagateRetractRightTuple( final RightTuple rightTuple, final PropagationContext context,
+ final InternalWorkingMemory workingMemory ) {
LeftTuple child = rightTuple.getBetaChildren();
- while ( child != null ) {
+ // TODO: shouldn't there be a single child tuple? so no need for iteration, right??
+ while( child != null ) {
LeftTuple temp = child.getRightParentNext();
- child.getLeftTupleSink().retractLeftTuple( child,
- context,
- workingMemory );
+ doPropagateRetractLeftTuple( context, workingMemory, child, child.getLeftTupleSink() );
child.unlinkFromLeftParent();
child.unlinkFromRightParent();
child = temp;
}
}
- public void createAndPropagateAssertLeftTuple(final InternalFactHandle factHandle,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- boolean leftTupleMemoryEnabled) {
- this.sink.assertLeftTuple( new LeftTuple( factHandle,
- this.sink,
- leftTupleMemoryEnabled ),
- context,
- workingMemory );
+ public void createAndPropagateAssertLeftTuple( final InternalFactHandle factHandle,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory,
+ boolean leftTupleMemoryEnabled ) {
+ doPropagateAssertLeftTuple( context, workingMemory,
+ new LeftTuple( factHandle, this.sink, leftTupleMemoryEnabled ) );
}
- public BaseNode getMatchingNode(BaseNode candidate) {
- if (candidate.equals(sink)) {
- return (BaseNode)sink;
+ public BaseNode getMatchingNode( BaseNode candidate ) {
+ if( candidate.equals( sink ) ) {
+ return (BaseNode) sink;
}
return null;
}
@@ -118,12 +97,41 @@
return (this.sink != null) ? 1 : 0;
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- this.sink = ( LeftTupleSink) in.readObject();
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ super.readExternal( in );
+ this.sink = (LeftTupleSink) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ super.writeExternal( out );
out.writeObject( this.sink );
}
+
+ /**
+ * This is a hook method that may be overriden by subclasses. Please keep it
+ * package protected.
+ *
+ * @param context
+ * @param workingMemory
+ * @param newLeftTuple
+ */
+ protected void doPropagateAssertLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple newLeftTuple ) {
+ this.sink.assertLeftTuple( newLeftTuple, context, workingMemory );
+ }
+
+ /**
+ * This is a hook method that may be overriden by subclasses. Please keep it
+ * package protected.
+ *
+ * @param context
+ * @param workingMemory
+ * @param child
+ * @param tupleSink
+ */
+ protected void doPropagateRetractLeftTuple( PropagationContext context, InternalWorkingMemory workingMemory,
+ LeftTuple child, LeftTupleSink tupleSink ) {
+ tupleSink.retractLeftTuple( child, context, workingMemory );
+ }
+
}
More information about the jboss-svn-commits
mailing list