[jboss-svn-commits] JBL Code SVN: r21843 - in labs/jbossrules/trunk: drools-compiler/src/test/java/org/drools/testframework and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Aug 24 12:01:02 EDT 2008
Author: tirelli
Date: 2008-08-24 12:00:53 -0400 (Sun, 24 Aug 2008)
New Revision: 21843
Added:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
Removed:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
Modified:
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java
Log:
Developing support for multi-thread concurrent partition propagation
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockLeftTupleSink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.NodeMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockLeftTupleSink extends LeftTupleSource
@@ -43,11 +44,11 @@
private LeftTupleSinkNode nextTupleSinkNode;
public MockLeftTupleSink() {
- super( 0 );
+ super( 0, RuleBasePartitionId.MAIN_PARTITION, false );
}
public MockLeftTupleSink(final int id) {
- super( id );
+ super( id, RuleBasePartitionId.MAIN_PARTITION, false );
}
public void assertLeftTuple(final LeftTuple tuple,
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/reteoo/MockRightTupleSink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -2,8 +2,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockRightTupleSink
@@ -25,6 +29,17 @@
public int getId() {
return 0;
- }
+ }
+ public RuleBasePartitionId getPartitionId() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/testframework/MockWorkingMemory.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,14 +18,7 @@
import org.drools.ObjectFilter;
import org.drools.QueryResults;
import org.drools.RuleBase;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalRuleBase;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.ObjectStore;
-import org.drools.common.ObjectTypeConfigurationRegistry;
-import org.drools.common.TruthMaintenanceSystem;
-import org.drools.common.WorkingMemoryAction;
+import org.drools.common.*;
import org.drools.concurrent.ExecutorService;
import org.drools.event.AgendaEventListener;
import org.drools.event.AgendaEventSupport;
@@ -41,6 +34,7 @@
import org.drools.process.instance.timer.TimerManager;
import org.drools.reteoo.LIANodePropagation;
import org.drools.reteoo.ObjectTypeConf;
+import org.drools.reteoo.PartitionTaskManager;
import org.drools.rule.EntryPoint;
import org.drools.rule.Rule;
import org.drools.rule.TimeMachine;
@@ -56,7 +50,7 @@
import org.drools.util.ObjectHashMap;
public class MockWorkingMemory implements InternalWorkingMemory {
-
+
List<Object> facts = new ArrayList<Object>();
AgendaEventListener agendaEventListener;
TimeMachine timeMachine = new TimeMachine();
@@ -536,6 +530,10 @@
return null;
}
+ public PartitionTaskManager getPartitionManager( RuleBasePartitionId partitionId ) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public SessionClock getSessionClock() {
// TODO Auto-generated method stub
return null;
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -815,6 +815,10 @@
return p;
}
+ public List<RuleBasePartitionId> getPartitionIds() {
+ return this.partitionIDs;
+ }
+
public void addEventListener(final RuleBaseEventListener listener) {
// since the event support is thread-safe, no need for locks... right?
this.eventSupport.addEventListener( listener );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -68,12 +68,7 @@
import org.drools.process.instance.WorkItemManager;
import org.drools.process.instance.context.variable.VariableScopeInstance;
import org.drools.process.instance.timer.TimerManager;
-import org.drools.reteoo.EntryPointNode;
-import org.drools.reteoo.InitialFactHandle;
-import org.drools.reteoo.InitialFactHandleDummyObject;
-import org.drools.reteoo.LIANodePropagation;
-import org.drools.reteoo.LeftTuple;
-import org.drools.reteoo.ObjectTypeConf;
+import org.drools.reteoo.*;
import org.drools.rule.Declaration;
import org.drools.rule.EntryPoint;
import org.drools.rule.Rule;
@@ -187,6 +182,8 @@
protected SessionConfiguration config;
+ protected Map<RuleBasePartitionId, PartitionTaskManager> partitionManagers;
+
// ------------------------------------------------------------
// Constructors
// ------------------------------------------------------------
@@ -278,6 +275,8 @@
this );
this.entryPoint = EntryPoint.DEFAULT;
+
+ initPartitionManagers();
initTransient();
}
@@ -291,6 +290,21 @@
initTransient();
}
+ /**
+ * Creates the actual partition managers and their tasks for multi-thread processing
+ */
+ private void initPartitionManagers() {
+ if( this.ruleBase.getConfiguration().isPartitionsEnabled() ) {
+
+ // the Map MUST be thread safe
+ this.partitionManagers = new ConcurrentHashMap<RuleBasePartitionId, PartitionTaskManager>();
+
+ for( RuleBasePartitionId partitionId : this.ruleBase.getPartitionIds() ) {
+ this.partitionManagers.put( partitionId, new PartitionTaskManager( this ) );
+ }
+ }
+ }
+
private void initTransient() {
this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );
this.typeConfReg = new ObjectTypeConfigurationRegistry( this.ruleBase );
@@ -1671,6 +1685,10 @@
return (SessionClock) this.getTimerManager().getTimerService();
}
+ public PartitionTaskManager getPartitionManager( final RuleBasePartitionId partitionId ) {
+ return partitionManagers.get( partitionId );
+ }
+
// public static class FactHandleInvalidation implements WorkingMemoryAction {
// private final InternalFactHandle handle;
//
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/BaseNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -36,6 +36,7 @@
protected int id;
protected RuleBasePartitionId partitionId;
+ protected boolean partitionsEnabled;
public BaseNode() {
@@ -47,17 +48,23 @@
* @param id
* The unique id
*/
- public BaseNode(final int id) {
+ public BaseNode(final int id, final RuleBasePartitionId partitionId, final boolean partitionsEnabled ) {
super();
this.id = id;
+ this.partitionId = partitionId;
+ this.partitionsEnabled = partitionsEnabled;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
id = in.readInt();
+ partitionId = (RuleBasePartitionId) in.readObject();
+ partitionsEnabled = in.readBoolean();
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(id);
+ out.writeObject( partitionId );
+ out.writeBoolean( partitionsEnabled );
}
/* (non-Javadoc)
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/ConcurrentNodeMemories.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -60,6 +60,12 @@
/**
* @inheritDoc
+ *
+ * The implementation tries to delay locking as much as possible, by running
+ * some potentialy unsafe opperations out of the critical session. In case it
+ * fails the checks, it will move into the critical sessions and re-check everything
+ * before effectively doing any change on data structures.
+ *
* @see org.drools.common.NodeMemories#getNodeMemory(org.drools.common.NodeMemory)
*/
public Object getNodeMemory( NodeMemory node ) {
@@ -75,6 +81,14 @@
return memory;
}
+
+ /**
+ * Checks if a memory does not exists for the given node and
+ * creates it.
+ *
+ * @param node
+ * @return
+ */
private Object createNodeMemory( NodeMemory node ) {
try {
this.lock.lock();
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalRuleBase.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.Map;
+import java.util.List;
import org.drools.FactException;
import org.drools.FactHandle;
@@ -128,4 +129,10 @@
* @return
*/
public RuleBasePartitionId createNewPartitionId();
+
+ /**
+ * Return the list of Partition IDs for this rulebase
+ * @return
+ */
+ List<RuleBasePartitionId> getPartitionIds();
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalWorkingMemory.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -12,6 +12,7 @@
import org.drools.process.instance.ProcessInstance;
import org.drools.process.instance.ProcessInstanceManager;
import org.drools.reteoo.LIANodePropagation;
+import org.drools.reteoo.PartitionTaskManager;
import org.drools.rule.Rule;
import org.drools.rule.TimeMachine;
import org.drools.spi.Activation;
@@ -99,4 +100,14 @@
public InternalFactHandle getInitialFactHandle();
public TimerService getTimerService();
+
+ /**
+ * Returns the PartitionTaskManager for the given partition ID
+ * in case the rulebase has partitions enabled
+ *
+ * @param partitionId the ID of the partition for which the task manager is assigned
+ *
+ * @return the PartitionTaskManager
+ */
+ public PartitionTaskManager getPartitionManager( RuleBasePartitionId partitionId );
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/NetworkNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -36,4 +36,12 @@
* unique int value
*/
public int getId();
+
+ /**
+ * Returns the partition ID to which this node belongs to
+ *
+ * @return
+ */
+ public RuleBasePartitionId getPartitionId();
+
}
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -30,7 +30,7 @@
private final String id;
- public RuleBasePartitionId(String id) {
+ public RuleBasePartitionId( final String id ) {
this.id = id;
}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AbstractObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+
+import java.io.Externalizable;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+/**
+ * An abstract super class for ObjectSinks
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public abstract class AbstractObjectSinkAdapter implements ObjectSinkPropagator, Externalizable {
+ protected RuleBasePartitionId partitionId;
+
+ protected AbstractObjectSinkAdapter( RuleBasePartitionId partitionId ) {
+ this.partitionId = partitionId;
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ out.writeObject( this.partitionId );
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ this.partitionId = (RuleBasePartitionId) in.readObject();
+ }
+
+ /**
+ * Returns the partition to which this propagator belongs to
+ *
+ * @return the ID of the partition
+ */
+ public RuleBasePartitionId getPartitionId() {
+ return this.partitionId;
+ }
+
+ /**
+ * Sets the partition to which this propagator belongs to
+ *
+ * @param partitionId
+ */
+ public void setPartitionId( RuleBasePartitionId partitionId ) {
+ this.partitionId = partitionId;
+ }
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -70,6 +70,8 @@
final boolean unwrapRightObject,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
leftInput,
rightInput,
sourceBinder,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AlphaNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -22,11 +22,7 @@
import org.drools.FactException;
import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.ContextEntry;
import org.drools.spi.AlphaNodeFieldConstraint;
@@ -76,13 +72,14 @@
* @param id Node's ID
* @param constraint Node's constraints
* @param objectSource Node's object source
- * @param hasMemory true if node shall be configured with local memory. False otherwise.
*/
public AlphaNode(final int id,
final AlphaNodeFieldConstraint constraint,
final ObjectSource objectSource,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
objectSource,
context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
this.constraint = constraint;
@@ -316,5 +313,19 @@
public int getId() {
return 0;
}
+
+ public RuleBasePartitionId getPartitionId() {
+ return this.sink.getPartitionId();
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ // this is a short living adapter class, so no need for serialization
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ // this is a short living adapter class, so no need for serialization
+ }
+
+
}
}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncCompositeObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * This is an asynchronous implementation of the CompositeObjectSinkAdapter that
+ * is used to propagate facts both between nodes in the same or in different
+ * partitions when partitions are enabled in the rulebase
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncCompositeObjectSinkAdapter extends CompositeObjectSinkAdapter {
+
+ public AsyncCompositeObjectSinkAdapter( RuleBasePartitionId partitionId, int alphaNodeHashingThreshold ) {
+ super(partitionId, alphaNodeHashingThreshold );
+ }
+
+ protected void doPropagateAssertObject( InternalFactHandle factHandle, PropagationContext context,
+ InternalWorkingMemory workingMemory, ObjectSink sink ) {
+ // composite propagators need to check each node to decide if the propagation
+ // must be asynchronous or may eventually be synchronous
+ if( this.partitionId.equals( sink.getPartitionId() )) {
+ // same partition, so synchronous propagation is fine
+ sink.assertObject( factHandle, context, workingMemory );
+ } else {
+ // different partition, so use asynchronous propagation
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, sink ) );
+ }
+ }
+}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import org.drools.common.RuleBasePartitionId;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+/**
+ * This is an asynchronous implementation of the SingleObjectSinkAdapter
+ * that is used to propagate facts between different partitions in the
+ * rulebase
+ *
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class AsyncSingleObjectSinkAdapter extends SingleObjectSinkAdapter {
+ public AsyncSingleObjectSinkAdapter( RuleBasePartitionId partitionId, ObjectSink objectSink ) {
+ super( partitionId, objectSink );
+ }
+
+ public void propagateAssertObject( InternalFactHandle factHandle, PropagationContext context, InternalWorkingMemory workingMemory ) {
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, this.sink ) );
+ }
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -23,11 +23,7 @@
import java.util.List;
import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.BetaConstraints;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.AccumulateNode.AccumulateMemory;
import org.drools.reteoo.CollectNode.CollectMemory;
import org.drools.rule.Behavior;
@@ -96,11 +92,15 @@
* The right input <code>ObjectSource</code>.
*/
BetaNode(final int id,
+ final RuleBasePartitionId partitionId,
+ final boolean partitionsEnabled,
final LeftTupleSource leftInput,
final ObjectSource rightInput,
final BetaConstraints constraints,
final Behavior[] behaviors) {
- super( id );
+ super( id,
+ partitionId,
+ partitionsEnabled );
this.leftInput = leftInput;
this.rightInput = rightInput;
this.constraints = constraints;
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CollectNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -86,6 +86,8 @@
final boolean unwrapRight,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
leftInput,
rightInput,
sourceBinder,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/CompositeObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -12,6 +12,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.rule.LiteralConstraint;
import org.drools.spi.AlphaNodeFieldConstraint;
import org.drools.spi.Evaluator;
@@ -25,9 +26,7 @@
import org.drools.util.ObjectHashMap;
import org.drools.util.ObjectHashMap.ObjectEntry;
-public class CompositeObjectSinkAdapter
- implements
- ObjectSinkPropagator {
+public class CompositeObjectSinkAdapter extends AbstractObjectSinkAdapter {
// /** You can override this property via a system property (eg -Ddrools.hashThreshold=4) */
// public static final String HASH_THRESHOLD_SYSTEM_PROPERTY = "drools.hashThreshold";
@@ -47,15 +46,17 @@
private int alphaNodeHashingThreshold;
public CompositeObjectSinkAdapter() {
- this( 3 );
+ this( null, 3 );
}
- public CompositeObjectSinkAdapter(final int alphaNodeHashingThreshold) {
+ public CompositeObjectSinkAdapter(final RuleBasePartitionId partitionId, final int alphaNodeHashingThreshold) {
+ super( partitionId );
this.alphaNodeHashingThreshold = alphaNodeHashingThreshold;
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ super.readExternal( in );
otherSinks = (ObjectSinkNodeList) in.readObject();
hashableSinks = (ObjectSinkNodeList) in.readObject();
hashedFieldIndexes = (LinkedList) in.readObject();
@@ -64,13 +65,14 @@
}
public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal( out );
out.writeObject( otherSinks );
out.writeObject( hashableSinks );
out.writeObject( hashedFieldIndexes );
out.writeObject( hashedSinkMap );
out.writeInt( alphaNodeHashingThreshold );
}
-
+
public ObjectSinkNodeList getOthers() {
return this.otherSinks;
}
@@ -329,9 +331,7 @@
final ObjectSink sink = (ObjectSink) this.hashedSinkMap.get( hashKey );
if ( sink != null ) {
// The sink exists so propagate
- sink.assertObject( factHandle,
- context,
- workingMemory );
+ doPropagateAssertObject( factHandle, context, workingMemory, sink );
}
}
}
@@ -339,23 +339,35 @@
// propagate unhashed
if ( this.hashableSinks != null ) {
for ( ObjectSinkNode sink = this.hashableSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {
- sink.assertObject( factHandle,
- context,
- workingMemory );
+ doPropagateAssertObject( factHandle, context, workingMemory, sink );
}
}
if ( this.otherSinks != null ) {
// propagate others
for ( ObjectSinkNode sink = this.otherSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {
- sink.assertObject( factHandle,
- context,
- workingMemory );
+ doPropagateAssertObject( factHandle, context, workingMemory, sink );
}
}
}
+ /**
+ * This is a Hook method for subclasses to override. Please keep it protected unless you know
+ * what you are doing.
+ *
+ * @param factHandle
+ * @param context
+ * @param workingMemory
+ * @param sink
+ */
+ protected void doPropagateAssertObject( InternalFactHandle factHandle, PropagationContext context,
+ InternalWorkingMemory workingMemory, ObjectSink sink ) {
+ sink.assertObject( factHandle,
+ context,
+ workingMemory );
+ }
+
public BaseNode getMatchingNode(BaseNode candidate) {
if ( this.otherSinks != null ) {
for ( ObjectSinkNode sink = this.otherSinks.getFirst(); sink != null; sink = sink.getNextObjectSinkNode() ) {
Deleted: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -1,64 +0,0 @@
-package org.drools.reteoo;
-
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.BaseNode;
-import org.drools.spi.PropagationContext;
-
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-
-public class EmptyObjectSinkAdapter
- implements
- ObjectSinkPropagator {
-
- private static final long serialVersionUID = -631743913176779720L;
-
- private static final EmptyObjectSinkAdapter instance = new EmptyObjectSinkAdapter();
-
- private static final ObjectSink[] SINK_LIST = new ObjectSink[0];
-
- public static EmptyObjectSinkAdapter getInstance() {
- return instance;
- }
-
- public EmptyObjectSinkAdapter() {
- }
-
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- }
-
- public void propagateAssertObject(final InternalFactHandle factHandle,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
-
- }
-
- public void propagateRetractObject(final InternalFactHandle handle,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- final boolean useHash) {
- }
-
- public BaseNode getMatchingNode(BaseNode candidate) {
- return null;
- }
-
- public ObjectSink[] getSinks() {
- return SINK_LIST;
- }
-
- public int size() {
- return 0;
- }
-
- public boolean equals(Object obj) {
- return obj instanceof EmptyObjectSinkAdapter;
- }
-
-}
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EmptyObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,74 @@
+package org.drools.reteoo;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
+import org.drools.spi.PropagationContext;
+
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+public class EmptyObjectSinkAdapter extends AbstractObjectSinkAdapter {
+
+ private static final long serialVersionUID = -631743913176779720L;
+
+ private static final EmptyObjectSinkAdapter INSTANCE = new EmptyObjectSinkAdapter();
+
+ private static final ObjectSink[] SINK_LIST = new ObjectSink[0];
+
+ public static EmptyObjectSinkAdapter getInstance() {
+ return INSTANCE;
+ }
+
+ public EmptyObjectSinkAdapter() {
+ super( null );
+ }
+
+ public EmptyObjectSinkAdapter( RuleBasePartitionId partitionId ) {
+ super( partitionId );
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ super.readExternal( in );
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal( out );
+ }
+
+ public RuleBasePartitionId getPartitionId() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void propagateAssertObject(final InternalFactHandle factHandle,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+
+ }
+
+ public void propagateRetractObject(final InternalFactHandle handle,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory,
+ final boolean useHash) {
+ }
+
+ public BaseNode getMatchingNode(BaseNode candidate) {
+ return null;
+ }
+
+ public ObjectSink[] getSinks() {
+ return SINK_LIST;
+ }
+
+ public int size() {
+ return 0;
+ }
+
+ public boolean equals(Object obj) {
+ return obj instanceof EmptyObjectSinkAdapter;
+ }
+
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EntryPointNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -27,12 +27,7 @@
import org.drools.WorkingMemoryEntryPoint;
import org.drools.base.ShadowProxy;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.InternalWorkingMemoryEntryPoint;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.EntryPoint;
import org.drools.spi.ObjectType;
@@ -87,14 +82,20 @@
final ObjectSource objectSource,
final BuildContext context) {
this( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
objectSource,
context.getCurrentEntryPoint() ); // irrelevant for this node, since it overrides sink management
}
public EntryPointNode(final int id,
+ final RuleBasePartitionId partitionId,
+ final boolean partitionsEnabled,
final ObjectSource objectSource,
final EntryPoint entryPoint) {
super( id,
+ partitionId,
+ partitionsEnabled,
objectSource,
999 ); // irrelevant for this node, since it overrides sink management
this.entryPoint = entryPoint;
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/EvalConditionNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -91,7 +91,9 @@
final LeftTupleSource tupleSource,
final EvalCondition eval,
final BuildContext context) {
- super( id );
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.condition = eval;
this.tupleSource = tupleSource;
this.tupleMemoryEnabled = context.isTupleMemoryEnabled();
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ExistsNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -71,6 +71,8 @@
final Behavior[] behaviors,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
leftInput,
rightInput,
joinNodeBinder,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/FromNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -6,6 +6,7 @@
import java.io.Serializable;
import org.drools.RuleBaseConfiguration;
+import org.drools.reteoo.builder.BuildContext;
import org.drools.common.BaseNode;
import org.drools.common.BetaConstraints;
import org.drools.common.EmptyBetaConstraints;
@@ -48,8 +49,11 @@
final LeftTupleSource tupleSource,
final AlphaNodeFieldConstraint[] constraints,
final BetaConstraints binder,
- final boolean tupleMemoryEnabled) {
- super( id );
+ final boolean tupleMemoryEnabled,
+ final BuildContext context ) {
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.dataProvider = dataProvider;
this.tupleSource = tupleSource;
this.alphaConstraints = constraints;
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/JoinNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -70,6 +70,8 @@
final Behavior[] behaviors,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
leftInput,
rightInput,
binder,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftInputAdapterNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -17,11 +17,7 @@
*/
import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.builder.BuildContext;
import org.drools.spi.PropagationContext;
import org.drools.util.FactEntry;
@@ -70,14 +66,13 @@
* The unique id of this node in the current Rete network
* @param source
* The parent node, where Facts are propagated from
- * @param binder
- * An optional binder to filter out propagations. This binder will exist when
- * a predicate is used in the first pattern, for instance
*/
public LeftInputAdapterNode(final int id,
final ObjectSource source,
final BuildContext context) {
- super( id );
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.objectSource = source;
this.leftTupleMemoryEnabled = context.isTupleMemoryEnabled();
}
@@ -273,6 +268,20 @@
public int getId() {
return 0;
}
+
+ public RuleBasePartitionId getPartitionId() {
+ return sink.getPartitionId();
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ // this is a short living adapter class used only during an update operation, and
+ // as so, no need for serialization code
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ // this is a short living adapter class used only during an update operation, and
+ // as so, no need for serialization code
+ }
}
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/LeftTupleSource.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -24,6 +24,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
/**
@@ -61,8 +62,8 @@
*
* @param id
*/
- LeftTupleSource(final int id) {
- super( id );
+ LeftTupleSource(final int id, final RuleBasePartitionId partitionId, final boolean partitionsEnabled ) {
+ super( id, partitionId, partitionsEnabled );
this.sink = EmptyLeftTupleSinkAdapter.getInstance();
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/NotNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -69,6 +69,8 @@
final Behavior[] behaviors,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
leftInput,
rightInput,
joinNodeBinder,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSinkPropagator.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -5,11 +5,15 @@
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public interface ObjectSinkPropagator
extends
Externalizable {
+
+ public RuleBasePartitionId getPartitionId();
+
public void propagateAssertObject(InternalFactHandle factHandle,
PropagationContext context,
InternalWorkingMemory workingMemory);
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectSource.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -16,7 +16,6 @@
* limitations under the License.
*/
-import java.io.Serializable;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -25,6 +24,7 @@
import org.drools.common.BaseNode;
import org.drools.common.DefaultFactHandle;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
/**
@@ -66,8 +66,12 @@
*
* @param id
*/
- ObjectSource(final int id) {
+ ObjectSource(final int id,
+ final RuleBasePartitionId partitionId,
+ final boolean partitionsEnabled) {
this( id,
+ partitionId,
+ partitionsEnabled,
null,
3 );
}
@@ -78,9 +82,11 @@
* @param id
*/
ObjectSource(final int id,
+ final RuleBasePartitionId partitionId,
+ final boolean partitionsEnabled,
final ObjectSource objectSource,
final int alphaNodeHashingThreshold) {
- super( id );
+ super( id, partitionId, partitionsEnabled );
this.source = objectSource;
this.alphaNodeHashingThreshold = alphaNodeHashingThreshold;
this.sink = EmptyObjectSinkAdapter.getInstance();
@@ -115,9 +121,25 @@
*/
protected void addObjectSink(final ObjectSink objectSink) {
if ( this.sink instanceof EmptyObjectSinkAdapter ) {
- this.sink = new SingleObjectSinkAdapter( objectSink );
+ if( this.partitionsEnabled && ! this.getPartitionId().equals( objectSink.getPartitionId() ) ) {
+ // if partitions are enabled and the next node belongs to a different partition,
+ // we need to use the asynchronous propagator
+ this.sink = new AsyncSingleObjectSinkAdapter( this.getPartitionId(), objectSink );
+ } else {
+ // otherwise, we use the lighter synchronous propagator
+ this.sink = new SingleObjectSinkAdapter( this.getPartitionId(), objectSink );
+ }
} else if ( this.sink instanceof SingleObjectSinkAdapter ) {
- final CompositeObjectSinkAdapter sinkAdapter = new CompositeObjectSinkAdapter( this.alphaNodeHashingThreshold );
+ final CompositeObjectSinkAdapter sinkAdapter;
+ if( this.partitionsEnabled ) {
+ // a composite propagator may propagate to both nodes in the same partition
+ // as well as in a different partition, so, if partitions are enabled, we
+ // must use the asynchronous version
+ sinkAdapter = new AsyncCompositeObjectSinkAdapter( this.getPartitionId(), this.alphaNodeHashingThreshold );
+ } else {
+ // if partitions are disabled, then it is safe to use the lighter synchronous propagator
+ sinkAdapter = new CompositeObjectSinkAdapter( this.getPartitionId(), this.alphaNodeHashingThreshold );
+ }
sinkAdapter.addObjectSink( this.sink.getSinks()[0] );
sinkAdapter.addObjectSink( objectSink );
this.sink = sinkAdapter;
@@ -143,7 +165,14 @@
final CompositeObjectSinkAdapter sinkAdapter = (CompositeObjectSinkAdapter) this.sink;
sinkAdapter.removeObjectSink( objectSink );
if ( sinkAdapter.size() == 1 ) {
- this.sink = new SingleObjectSinkAdapter( sinkAdapter.getSinks()[0] );
+ if( this.partitionsEnabled && ! this.getPartitionId().equals( sinkAdapter.getSinks()[0].getPartitionId() ) ) {
+ // if partitions are enabled and the next node belongs to a different partition,
+ // we need to use the asynchronous propagator
+ this.sink = new AsyncSingleObjectSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+ } else {
+ // otherwise, we use the lighter synchronous propagator
+ this.sink = new SingleObjectSinkAdapter( this.getPartitionId(), sinkAdapter.getSinks()[0] );
+ }
}
}
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ObjectTypeNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -16,21 +16,14 @@
* limitations under the License.
*/
-import java.io.Serializable;
import java.io.Externalizable;
-import java.io.ObjectOutput;
import java.io.IOException;
import java.io.ObjectInput;
+import java.io.ObjectOutput;
import org.drools.RuleBaseConfiguration;
import org.drools.base.ClassObjectType;
-import org.drools.common.AbstractRuleBase;
-import org.drools.common.BaseNode;
-import org.drools.common.DroolsObjectInputStream;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.Declaration;
import org.drools.rule.EntryPoint;
@@ -38,8 +31,6 @@
import org.drools.spi.Constraint;
import org.drools.spi.ObjectType;
import org.drools.spi.PropagationContext;
-import org.drools.util.FactEntry;
-import org.drools.util.RightTupleList;
import org.drools.util.Iterator;
import org.drools.util.ObjectHashSet;
import org.drools.util.ObjectHashSet.ObjectEntry;
@@ -104,6 +95,8 @@
final ObjectType objectType,
final BuildContext context) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
source,
context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
this.objectType = objectType;
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PartitionTaskManager.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,331 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.drools.reteoo;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.spi.PropagationContext;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A class to control the worker thread for each rulebase partition.
+ * It contains an internal Single Thread Pool that ensures thread
+ * respawn and a task that ensures no more than a single thread is
+ * executing it concurrently.
+ *
+ * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class PartitionTaskManager {
+
+ private ExecutorService pool = null;
+ private PartitionTask task = null;
+
+ public PartitionTaskManager( final InternalWorkingMemory workingMemory ) {
+ this.task = new PartitionTask( workingMemory );
+ }
+
+ /**
+ * Starts the service
+ */
+ public synchronized void startService() {
+ if( !isRunning() ) {
+ // I'm not sure we should create and destroy the pool every service start/stop,
+ // but for now, lets do that. Later we can reevaluate if that is needed.
+ this.pool = Executors.newSingleThreadExecutor();
+ this.pool.execute( this.task );
+ }
+ }
+
+ /**
+ * Nicely requests the service to stop. This method will not wait
+ * for the service to finish.
+ */
+ public synchronized boolean stopService() {
+ boolean result = true;
+ if( isRunning() ) {
+ this.task.shutdown();
+ // I'm not sure we should create and destroy the pool every service start/stop,
+ // but for now, lets do that. Later we can reevaluate if that is needed.
+ this.pool.shutdown();
+ this.pool = null;
+ }
+ return result;
+ }
+
+ /**
+ * Nicely requests the service to stop. This method will wait up to
+ * the given timeout for the service to finish and will return.
+ *
+ * @return true in case the services finished, false otherwise
+ */
+ public synchronized boolean stopService( final long timeout, final TimeUnit unit ) {
+ boolean result = true;
+ if( isRunning() ) {
+ this.task.shutdown();
+ // I'm not sure we should create and destroy the pool every service start/stop,
+ // but for now, lets do that. Later we can reevaluate if that is needed.
+ this.pool.shutdown();
+ try {
+ result = this.pool.awaitTermination( timeout, unit );
+ } catch( InterruptedException e ) {
+ result = false;
+ }
+ this.pool = null;
+ }
+ return result;
+ }
+
+ /**
+ * Nicely requests the service to stop. This method will wait until
+ * the service finishes or an InterruptedException is generated
+ * and will return.
+ *
+ * @return true in case the services finished, false otherwise
+ */
+ public synchronized boolean stopServiceAndWait() {
+ boolean result = true;
+ if( isRunning() ) {
+ this.task.shutdown();
+ // I'm not sure we should create and destroy the pool every service start/stop,
+ // but for now, lets do that. Later we can reevaluate if that is needed.
+ this.pool.shutdown();
+ try {
+ while( !this.pool.awaitTermination( 10, TimeUnit.SECONDS ) ) {
+ ;
+ }
+ result = this.pool.isTerminated();
+ } catch( InterruptedException e ) {
+ result = false;
+ }
+ this.pool = null;
+ }
+ return result;
+ }
+
+ /**
+ * Checks if the task is running.
+ *
+ * @return true if the task is running. false otherwise.
+ */
+ public synchronized boolean isRunning() {
+ return pool != null && !pool.isTerminated();
+ }
+
+ /**
+ * Adds the given action to the processing queue
+ *
+ * @param action the action to be processed
+ * @return true if the action was successfully added to the processing queue. false otherwise.
+ */
+ public boolean enqueue( final Action action ) {
+ return this.task.enqueue( action );
+ }
+
+ /**
+ * A worker task that keeps processing the nodes queue.
+ * The task uses a blocking queue and keeps processing
+ * nodes while there are nodes in the queue and it is not
+ * shutdown. If the queue is emptied, the class will wait
+ * until a new node is added.
+ */
+ public static class PartitionTask implements Runnable {
+
+ // the queue with the nodes that need to be processed
+ private BlockingQueue<Action> queue;
+
+ // the working memory reference
+ private InternalWorkingMemory workingMemory;
+
+ // a flag to nicely shutdown the thread
+ private volatile AtomicBoolean shutdown;
+
+ // the actual thread that is running
+ private Thread runner;
+
+
+ /**
+ * Constructor
+ *
+ * @param workingMemory the working memory reference that is used for node processing
+ */
+ public PartitionTask( final InternalWorkingMemory workingMemory ) {
+ this.queue = new LinkedBlockingQueue<Action>();
+ this.shutdown = new AtomicBoolean( false );
+ this.workingMemory = workingMemory;
+ this.runner = null;
+ }
+
+ /**
+ * Default execution method.
+ *
+ * @see Runnable
+ */
+ public void run() {
+ // this task can not be shared among multiple threads
+ if( checkAndSetRunning() ) {
+ return;
+ }
+
+ while( !shutdown.get() ) {
+ try {
+ // this is a blocking call
+ if( Thread.currentThread().isInterrupted() ) {
+ cancel();
+ break;
+ }
+ Action action = queue.take();
+ action.execute( workingMemory );
+
+ } catch( InterruptedException e ) {
+ cancel();
+ }
+ }
+ }
+
+ /**
+ * Requests this task to shutdown
+ */
+ public void shutdown() {
+ synchronized( this ) {
+ if( this.runner != null ) {
+ this.runner.interrupt();
+ }
+ }
+ this.cancel();
+ }
+
+ /**
+ * Returns true if this task is currently executing
+ *
+ * @return true if the task is currently executing
+ */
+ public boolean isRunning() {
+ synchronized( this ) {
+ return !shutdown.get() && this.runner != null;
+ }
+ }
+
+ /**
+ * Adds the given action to the processing queue returning true if the action
+ * was correctly added or false otherwise.
+ *
+ * @param action the action to add to the processing queue
+ * @return true if the node was successfully added to the queue. false otherwise.
+ */
+ public boolean enqueue( final Action action ) {
+ return this.queue.offer( action );
+ }
+
+ /**
+ * Cancels current execution and cleans up used resources
+ */
+ private void cancel() {
+ // if the blocking call was interrupted, then check for the cancelation flag
+ shutdown.set( true );
+ // cleaning up cache reference
+ synchronized( this ) {
+ this.runner = null;
+ }
+ }
+
+ /**
+ * Checks if the task is already running in a different thread. If it is not
+ * running yet, caches current thread reference.
+ *
+ * @return true if the task is already running in a different thread. false otherwise.
+ */
+ private boolean checkAndSetRunning() {
+ synchronized( this ) {
+ if( this.runner == null && !Thread.currentThread().isInterrupted() ) {
+ // if it is not running yet, cache the thread reference
+ this.runner = Thread.currentThread();
+ this.shutdown.set( false );
+ } else {
+ // there can be only one thread executing each instance of PartitionTask
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ /**
+ * An interface for all actions to be executed by the PartitionTask
+ */
+ public static interface Action extends Externalizable {
+ public abstract void execute( final InternalWorkingMemory workingMemory );
+ }
+
+ /**
+ * An abstract super class for all handle-related actions
+ */
+ public static abstract class FactAction implements Action, Externalizable {
+
+ protected InternalFactHandle handle;
+ protected PropagationContext context;
+ protected ObjectSink sink;
+
+ public FactAction() {
+ }
+
+ public FactAction( final InternalFactHandle handle, final PropagationContext context,
+ final ObjectSink sink ) {
+ super();
+ this.handle = handle;
+ this.context = context;
+ this.sink = sink;
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ handle = (InternalFactHandle) in.readObject();
+ context = (PropagationContext) in.readObject();
+ sink = (ObjectSink) in.readObject();
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ out.writeObject( handle );
+ out.writeObject( context );
+ out.writeObject( sink );
+ }
+
+ public abstract void execute( final InternalWorkingMemory workingMemory );
+ }
+
+ static class FactAssertAction extends FactAction {
+ private static final long serialVersionUID = -8478488926430845209L;
+
+ FactAssertAction() {
+ }
+
+ public FactAssertAction( final InternalFactHandle handle, final PropagationContext context,
+ final ObjectSink sink ) {
+ super( handle, context, sink );
+ }
+
+ public void execute( final InternalWorkingMemory workingMemory ) {
+ sink.assertObject( this.handle, this.context, workingMemory );
+ }
+ }
+
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/PropagationQueuingNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -15,46 +15,38 @@
*/
package org.drools.reteoo;
-import java.io.ObjectOutput;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.Externalizable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.drools.RuleBaseConfiguration;
import org.drools.RuntimeDroolsException;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.WorkingMemoryAction;
+import org.drools.common.*;
import org.drools.marshalling.MarshallerReaderContext;
import org.drools.marshalling.MarshallerWriteContext;
import org.drools.reteoo.builder.BuildContext;
import org.drools.spi.PropagationContext;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* A node that will add the propagation to the working memory actions queue,
* in order to allow multiple threads to concurrently assert objects to multiple
* entry points.
*
* @author etirelli
- *
*/
-public class PropagationQueuingNode extends ObjectSource
- implements
- ObjectSinkNode,
- NodeMemory {
+public class PropagationQueuingNode extends ObjectSource implements ObjectSinkNode, NodeMemory {
- private static final long serialVersionUID = -615639068150958767L;
+ private static final long serialVersionUID = -615639068150958767L;
// should we make this one configurable?
- private static final int PROPAGATION_SLICE_LIMIT = 1000;
+ private static final int PROPAGATION_SLICE_LIMIT = 1000;
private ObjectSinkNode previousObjectSinkNode;
private ObjectSinkNode nextObjectSinkNode;
- private PropagateAction action;
+ private PropagateAction action;
public PropagationQueuingNode() {
}
@@ -64,29 +56,29 @@
* propagations until it the engine reaches a safe propagation point,
* when all the queued facts are propagated.
*
- * @param id Node's ID
- * @param constraint Node's constraints
+ * @param id Node's ID
* @param objectSource Node's object source
- * @param hasMemory true if node shall be configured with local memory. False otherwise.
+ * @param context
*/
- public PropagationQueuingNode(final int id,
- final ObjectSource objectSource,
- final BuildContext context) {
+ public PropagationQueuingNode( final int id,
+ final ObjectSource objectSource,
+ final BuildContext context ) {
super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled(),
objectSource,
context.getRuleBase().getConfiguration().getAlphaNodeHashingThreshold() );
this.action = new PropagateAction( this );
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
super.readExternal( in );
previousObjectSinkNode = (ObjectSinkNode) in.readObject();
nextObjectSinkNode = (ObjectSinkNode) in.readObject();
action = (PropagateAction) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
super.writeExternal( out );
out.writeObject( previousObjectSinkNode );
out.writeObject( nextObjectSinkNode );
@@ -97,21 +89,20 @@
* @see org.drools.reteoo.ObjectSource#updateSink(org.drools.reteoo.ObjectSink, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
*/
@Override
- public void updateSink(ObjectSink sink,
- PropagationContext context,
- InternalWorkingMemory workingMemory) {
+ public void updateSink( ObjectSink sink, PropagationContext context, InternalWorkingMemory workingMemory ) {
- final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
+ final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+ .getNodeMemory( this );
// this is just sanity code. We may remove it in the future, but keeping it for now.
- if ( !memory.isEmpty() ) {
- throw new RuntimeDroolsException( "Error updating sink. Not safe to update sink as the PropagatingQueueingNode memory is not for node: " + this.toString() );
+ if( !memory.isEmpty() ) {
+ throw new RuntimeDroolsException(
+ "Error updating sink. Not safe to update sink as the PropagatingQueueingNode memory is not for node: " + this
+ .toString() );
}
// as this node is simply a queue, ask object source to update the child sink directly
- this.source.updateSink( sink,
- context,
- workingMemory );
+ this.source.updateSink( sink, context, workingMemory );
}
/**
@@ -126,31 +117,23 @@
* @see org.drools.common.BaseNode#attach(org.drools.common.InternalWorkingMemory[])
*/
@Override
- public void attach(InternalWorkingMemory[] workingMemories) {
+ public void attach( InternalWorkingMemory[] workingMemories ) {
attach();
// this node does not require update, so nothing else to do.
}
- /**
- * @see org.drools.common.BaseNode#remove(ReteooBuilder, org.drools.common.BaseNode, org.drools.common.InternalWorkingMemory[])
- */
@Override
- protected void doRemove(final RuleRemovalContext context,
- final ReteooBuilder builder,
- final BaseNode node,
- final InternalWorkingMemory[] workingMemories) {
- if ( !node.isInUse() ) {
+ protected void doRemove( final RuleRemovalContext context, final ReteooBuilder builder, final BaseNode node,
+ final InternalWorkingMemory[] workingMemories ) {
+ if( !node.isInUse() ) {
removeObjectSink( (ObjectSink) node );
}
- if ( !this.isInUse() ) {
- for ( int i = 0, length = workingMemories.length; i < length; i++ ) {
+ if( !this.isInUse() ) {
+ for( int i = 0, length = workingMemories.length; i < length; i++ ) {
workingMemories[i].clearNodeMemory( this );
}
}
- this.source.remove( context,
- builder,
- this,
- workingMemories );
+ this.source.remove( context, builder, this, workingMemories );
}
/**
@@ -170,95 +153,82 @@
/**
* @see org.drools.reteoo.ObjectSinkNode#setNextObjectSinkNode(org.drools.reteoo.ObjectSinkNode)
*/
- public void setNextObjectSinkNode(ObjectSinkNode next) {
+ public void setNextObjectSinkNode( ObjectSinkNode next ) {
this.nextObjectSinkNode = next;
}
/**
* @see org.drools.reteoo.ObjectSinkNode#setPreviousObjectSinkNode(org.drools.reteoo.ObjectSinkNode)
*/
- public void setPreviousObjectSinkNode(ObjectSinkNode previous) {
+ public void setPreviousObjectSinkNode( ObjectSinkNode previous ) {
this.previousObjectSinkNode = previous;
}
/**
* @see org.drools.reteoo.ObjectSink#assertObject(InternalFactHandle, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
*/
- public void assertObject(InternalFactHandle factHandle,
- PropagationContext context,
- InternalWorkingMemory workingMemory) {
- final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
- memory.addAction( new AssertAction( factHandle,
- context ) );
+ public void assertObject( InternalFactHandle factHandle, PropagationContext context,
+ InternalWorkingMemory workingMemory ) {
+ final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+ .getNodeMemory( this );
+ memory.addAction( new AssertAction( factHandle, context ) );
// if not queued yet, we need to queue it up
- if ( memory.isQueued().compareAndSet( false,
- true ) ) {
+ if( memory.isQueued().compareAndSet( false, true ) ) {
workingMemory.queueWorkingMemoryAction( this.action );
}
}
- /**
- * @see org.drools.reteoo.ObjectSink#isObjectMemoryEnabled()
- */
public boolean isObjectMemoryEnabled() {
return true;
}
- /**
- * @see org.drools.reteoo.ObjectSink#retractObject(org.drools.common.InternalFactHandle, org.drools.spi.PropagationContext, org.drools.common.InternalWorkingMemory)
- */
- public void retractObject(InternalFactHandle handle,
- PropagationContext context,
- InternalWorkingMemory workingMemory) {
- final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
- memory.addAction( new RetractAction( handle,
- context ) );
+ public void retractObject( InternalFactHandle handle, PropagationContext context,
+ InternalWorkingMemory workingMemory ) {
+ final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+ .getNodeMemory( this );
+ memory.addAction( new RetractAction( handle, context ) );
// if not queued yet, we need to queue it up
- if ( memory.isQueued().compareAndSet( false,
- true ) ) {
+ if( memory.isQueued().compareAndSet( false, true ) ) {
workingMemory.queueWorkingMemoryAction( this.action );
}
}
/**
* Propagate all queued actions (asserts and retracts).
- *
+ * <p/>
* This method implementation is based on optimistic behavior to avoid the
* use of locks. There may eventually be a minimum wasted effort, but overall
* it will be better than paying for the lock's cost.
*
* @param workingMemory
*/
- public void propagateActions(InternalWorkingMemory workingMemory) {
- final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory.getNodeMemory( this );
+ public void propagateActions( InternalWorkingMemory workingMemory ) {
+ final PropagationQueueingNodeMemory memory = (PropagationQueueingNodeMemory) workingMemory
+ .getNodeMemory( this );
// first we clear up the action queued flag
memory.isQueued().set( false );
// we limit the propagation to avoid a hang when this queue is never empty
Action next = memory.getNext();
- for ( int counter = 0; next != null && counter < PROPAGATION_SLICE_LIMIT; next = memory.getNext(), counter++ ) {
- next.execute( this.sink,
- workingMemory );
+ for( int counter = 0; next != null && counter < PROPAGATION_SLICE_LIMIT; next = memory.getNext(), counter++ ) {
+ next.execute( this.sink, workingMemory );
}
- if ( memory.hasNext() ) {
+ if( memory.hasNext() ) {
// add action to the queue again.
memory.isQueued().set( true );
workingMemory.queueWorkingMemoryAction( this.action );
}
}
- /**
- * @see org.drools.reteoo.ObjectSink#setObjectMemoryEnabled(boolean)
- */
- public void setObjectMemoryEnabled(boolean objectMemoryOn) {
+ public void setObjectMemoryEnabled( boolean objectMemoryOn ) {
throw new UnsupportedOperationException( "PropagationQueueingNode must have its node memory enabled." );
}
- public Object createMemory(RuleBaseConfiguration config) {
+ public Object createMemory( RuleBaseConfiguration config ) {
return new PropagationQueueingNodeMemory();
}
@@ -267,16 +237,14 @@
*
* @author etirelli
*/
- public static class PropagationQueueingNodeMemory
- implements
- Externalizable {
+ public static class PropagationQueueingNodeMemory implements Externalizable {
- private static final long serialVersionUID = 7372028632974484023L;
+ private static final long serialVersionUID = 7372028632974484023L;
private ConcurrentLinkedQueue<Action> queue;
// "singleton" action - there is one of this for each node in each working memory
- private AtomicBoolean isQueued;
+ private AtomicBoolean isQueued;
public PropagationQueueingNodeMemory() {
super();
@@ -284,13 +252,12 @@
this.isQueued = new AtomicBoolean( false );
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
queue = (ConcurrentLinkedQueue<Action>) in.readObject();
isQueued = (AtomicBoolean) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
out.writeObject( queue );
out.writeObject( isQueued );
}
@@ -299,7 +266,7 @@
return this.queue.isEmpty();
}
- public void addAction(Action action) {
+ public void addAction( Action action ) {
this.queue.add( action );
}
@@ -316,9 +283,7 @@
}
}
- private static abstract class Action
- implements
- Externalizable {
+ private static abstract class Action implements Externalizable {
protected InternalFactHandle handle;
protected PropagationContext context;
@@ -327,42 +292,34 @@
}
- public Action(InternalFactHandle handle,
- PropagationContext context) {
+ public Action( InternalFactHandle handle, PropagationContext context ) {
super();
this.handle = handle;
this.context = context;
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
handle = (InternalFactHandle) in.readObject();
context = (PropagationContext) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
out.writeObject( handle );
out.writeObject( context );
}
- public abstract void execute(final ObjectSinkPropagator sink,
- final InternalWorkingMemory workingMemory);
+ public abstract void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory );
}
private static class AssertAction extends Action {
private static final long serialVersionUID = -8478488926430845209L;
- public AssertAction(final InternalFactHandle handle,
- final PropagationContext context) {
- super( handle,
- context );
+ public AssertAction( final InternalFactHandle handle, final PropagationContext context ) {
+ super( handle, context );
}
- public void execute(final ObjectSinkPropagator sink,
- final InternalWorkingMemory workingMemory) {
- sink.propagateAssertObject( this.handle,
- this.context,
- workingMemory );
+ public void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory ) {
+ sink.propagateAssertObject( this.handle, this.context, workingMemory );
}
}
@@ -373,26 +330,21 @@
}
- public RetractAction(final InternalFactHandle handle,
- final PropagationContext context) {
- super( handle,
- context );
+ public RetractAction( final InternalFactHandle handle, final PropagationContext context ) {
+ super( handle, context );
}
- public void execute(final ObjectSinkPropagator sink,
- final InternalWorkingMemory workingMemory) {
-
- for ( RightTuple rightTuple = this.handle.getRightTuple(); rightTuple != null; rightTuple = (RightTuple) rightTuple.getHandleNext() ) {
- rightTuple.getRightTupleSink().retractRightTuple( rightTuple,
- context,
- workingMemory );
+ public void execute( final ObjectSinkPropagator sink, final InternalWorkingMemory workingMemory ) {
+
+ for( RightTuple rightTuple = this.handle
+ .getRightTuple(); rightTuple != null; rightTuple = (RightTuple) rightTuple.getHandleNext() ) {
+ rightTuple.getRightTupleSink().retractRightTuple( rightTuple, context, workingMemory );
}
this.handle.setRightTuple( null );
- for ( LeftTuple leftTuple = this.handle.getLeftTuple(); leftTuple != null; leftTuple = (LeftTuple) leftTuple.getLeftParentNext() ) {
- leftTuple.getLeftTupleSink().retractLeftTuple( leftTuple,
- context,
- workingMemory );
+ for( LeftTuple leftTuple = this.handle.getLeftTuple(); leftTuple != null; leftTuple = (LeftTuple) leftTuple
+ .getLeftParentNext() ) {
+ leftTuple.getLeftTupleSink().retractLeftTuple( leftTuple, context, workingMemory );
}
this.handle.setLeftTuple( null );
}
@@ -403,13 +355,10 @@
* this node propagation can be triggered at a safe point
*
* @author etirelli
- *
*/
- public static class PropagateAction
- implements
- WorkingMemoryAction {
+ public static class PropagateAction implements WorkingMemoryAction {
- private static final long serialVersionUID = 6765029029501617115L;
+ private static final long serialVersionUID = 6765029029501617115L;
private PropagationQueuingNode node;
@@ -417,28 +366,27 @@
}
- public PropagateAction(PropagationQueuingNode node) {
+ public PropagateAction( PropagationQueuingNode node ) {
this.node = node;
}
-
- public PropagateAction(MarshallerReaderContext context) throws IOException {
- this.node = ( PropagationQueuingNode ) context.sinks.get( context.readInt() );
- }
-
- public void write(MarshallerWriteContext context) throws IOException {
+
+ public PropagateAction( MarshallerReaderContext context ) throws IOException {
+ this.node = (PropagationQueuingNode) context.sinks.get( context.readInt() );
+ }
+
+ public void write( MarshallerWriteContext context ) throws IOException {
context.write( node.getId() );
}
- public void readExternal(ObjectInput in) throws IOException,
- ClassNotFoundException {
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
node = (PropagationQueuingNode) in.readObject();
}
- public void writeExternal(ObjectOutput out) throws IOException {
+ public void writeExternal( ObjectOutput out ) throws IOException {
out.writeObject( node );
}
- public void execute(InternalWorkingMemory workingMemory) {
+ public void execute( InternalWorkingMemory workingMemory ) {
this.node.propagateActions( workingMemory );
}
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/QueryTerminalNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -22,10 +22,8 @@
import java.io.ObjectInput;
import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.reteoo.builder.BuildContext;
+import org.drools.common.*;
import org.drools.rule.GroupElement;
import org.drools.rule.Rule;
import org.drools.spi.PropagationContext;
@@ -70,18 +68,22 @@
}
/**
- * Construct.
+ * Constructor
*
- * @param inputSource
- * The parent tuple source.
- * @param rule
- * The rule.
+ * @param id node ID
+ * @param source the tuple source for this node
+ * @param rule the rule this node belongs to
+ * @param subrule the subrule this node belongs to
+ * @param context the current build context
*/
public QueryTerminalNode(final int id,
final LeftTupleSource source,
final Rule rule,
- final GroupElement subrule) {
- super( id );
+ final GroupElement subrule,
+ final BuildContext context ) {
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.rule = rule;
this.subrule = subrule;
this.tupleSource = source;
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Rete.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -25,11 +25,7 @@
import java.util.List;
import java.util.Map;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalRuleBase;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.InternalWorkingMemoryEntryPoint;
+import org.drools.common.*;
import org.drools.rule.EntryPoint;
import org.drools.spi.ObjectType;
import org.drools.spi.PropagationContext;
@@ -79,7 +75,7 @@
// ------------------------------------------------------------
public Rete(InternalRuleBase ruleBase) {
- super( 0 );
+ super( 0, RuleBasePartitionId.MAIN_PARTITION, ruleBase != null ? ruleBase.getConfiguration().isPartitionsEnabled() : false );
this.entryPoints = new HashMap<EntryPoint, EntryPointNode>();
this.ruleBase = ruleBase;
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -29,10 +29,7 @@
import org.drools.SessionConfiguration;
import org.drools.StatefulSession;
import org.drools.StatelessSession;
-import org.drools.common.AbstractRuleBase;
-import org.drools.common.DefaultFactHandle;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
+import org.drools.common.*;
import org.drools.concurrent.CommandExecutor;
import org.drools.concurrent.ExecutorService;
import org.drools.event.RuleBaseEventListener;
@@ -138,6 +135,8 @@
// always add the default entry point
EntryPointNode epn = new EntryPointNode( this.reteooBuilder.getIdGenerator().getNextId(),
+ RuleBasePartitionId.MAIN_PARTITION,
+ this.config.isPartitionsEnabled(),
this.rete,
EntryPoint.DEFAULT );
epn.attach();
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightInputAdapterNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -17,11 +17,7 @@
*/
import org.drools.RuleBaseConfiguration;
-import org.drools.common.BaseNode;
-import org.drools.common.InternalFactHandle;
-import org.drools.common.InternalWorkingMemory;
-import org.drools.common.NodeMemory;
-import org.drools.common.PropagationContextImpl;
+import org.drools.common.*;
import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.EntryPoint;
import org.drools.spi.PropagationContext;
@@ -72,7 +68,9 @@
public RightInputAdapterNode(final int id,
final LeftTupleSource source,
final BuildContext context) {
- super( id );
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.tupleSource = source;
this.tupleMemoryEnabled = context.isTupleMemoryEnabled();
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RuleTerminalNode.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -98,12 +98,14 @@
final LeftTupleSource source,
final Rule rule,
final GroupElement subrule,
- final BuildContext buildContext) {
- super( id );
+ final BuildContext context) {
+ super( id,
+ context.getPartitionId(),
+ context.getRuleBase().getConfiguration().isPartitionsEnabled() );
this.rule = rule;
this.tupleSource = source;
this.subrule = subrule;
- this.tupleMemoryEnabled = buildContext.isTerminalNodeMemoryEnabled();
+ this.tupleMemoryEnabled = context.isTerminalNodeMemoryEnabled();
}
// ------------------------------------------------------------
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/SingleObjectSinkAdapter.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -3,6 +3,7 @@
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.BaseNode;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
import java.io.Externalizable;
@@ -10,29 +11,29 @@
import java.io.IOException;
import java.io.ObjectInput;
-public class SingleObjectSinkAdapter
- implements
- ObjectSinkPropagator,
- Externalizable {
+public class SingleObjectSinkAdapter extends AbstractObjectSinkAdapter {
private static final long serialVersionUID = 873985743021L;
- private ObjectSink sink;
+ protected ObjectSink sink;
public SingleObjectSinkAdapter() {
-
+ super( null );
}
- public SingleObjectSinkAdapter(final ObjectSink sink) {
+ public SingleObjectSinkAdapter(final RuleBasePartitionId partitionId, final ObjectSink sink) {
+ super( partitionId );
this.sink = sink;
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ super.readExternal( in );
sink = (ObjectSink) in.readObject();
}
public void writeExternal(ObjectOutput out) throws IOException {
+ super.writeExternal( out );
out.writeObject( sink );
}
@@ -42,7 +43,6 @@
this.sink.assertObject( factHandle,
context,
workingMemory );
-
}
public BaseNode getMatchingNode(BaseNode candidate) {
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/Sink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -1,10 +1,11 @@
package org.drools.reteoo;
+import org.drools.common.NetworkNode;
+
/**
* A simple markup interfaces for Sink types
*
* @author etirelli
*/
-public interface Sink {
- public int getId();
+public interface Sink extends NetworkNode {
}
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -112,6 +112,8 @@
this.currentEntryPoint = EntryPoint.DEFAULT;
this.nodes = new LinkedList<BaseNode>();
+
+ this.partitionId = RuleBasePartitionId.MAIN_PARTITION;
}
/**
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/FromBuilder.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -49,7 +49,8 @@
context.getTupleSource(),
(AlphaNodeFieldConstraint[]) context.getAlphaConstraints().toArray( new AlphaNodeFieldConstraint[context.getAlphaConstraints().size()] ),
betaConstraints,
- context.isTupleMemoryEnabled() ) ) );
+ context.isTupleMemoryEnabled(),
+ context ) ) );
context.setAlphaConstraints( null );
context.setBetaconstraints( null );
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -146,7 +146,8 @@
terminal = new QueryTerminalNode( context.getNextId(),
context.getTupleSource(),
rule,
- subrule );
+ subrule,
+ context );
}
if ( context.getWorkingMemories().length == 0 ) {
((BaseNode) terminal).attach();
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/BaseNodeTest.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -20,6 +20,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class BaseNodeTest extends TestCase {
@@ -44,7 +45,7 @@
}
public MockBaseNode(final int id) {
- super( id );
+ super( id, RuleBasePartitionId.MAIN_PARTITION, false );
}
public void ruleAttached() {
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/CompositeObjectSinkAdapterTest.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -19,6 +19,7 @@
import org.drools.common.EmptyBetaConstraints;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.Behavior;
import org.drools.rule.LiteralConstraint;
@@ -550,6 +551,8 @@
final LeftTupleSource leftInput,
final ObjectSource rightInput) {
super( id,
+ RuleBasePartitionId.MAIN_PARTITION,
+ false,
leftInput,
rightInput,
EmptyBetaConstraints.getInstance(),
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/FromNodeTest.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
import org.drools.common.PropagationContextImpl;
import org.drools.common.SingleBetaConstraints;
import org.drools.reteoo.FromNode.FromMemory;
+import org.drools.reteoo.builder.BuildContext;
import org.drools.rule.Declaration;
import org.drools.rule.LiteralConstraint;
import org.drools.rule.Pattern;
@@ -42,10 +43,15 @@
EqualityEvaluatorsDefinition equals = new EqualityEvaluatorsDefinition();
ClassFieldAccessorStore store = new ClassFieldAccessorStore();
+ private ReteooRuleBase ruleBase;
+ private BuildContext buildContext;
protected void setUp() throws Exception {
store.setClassFieldAccessorCache( new ClassFieldAccessorCache( Thread.currentThread().getContextClassLoader() ) );
store.setEagerWire( true );
+
+ ruleBase = (ReteooRuleBase) RuleBaseFactory.newRuleBase();
+ buildContext = new BuildContext( ruleBase, new ReteooBuilder.IdGenerator() );
}
public void testAlphaNode() {
@@ -54,8 +60,8 @@
null,
null,
null );
- final ReteooWorkingMemory workingMemory = new ReteooWorkingMemory( 1,
- (ReteooRuleBase) RuleBaseFactory.newRuleBase() );
+ final ReteooWorkingMemory workingMemory = new ReteooWorkingMemory( 1, ruleBase );
+
final ClassFieldReader extractor = store.getReader( Cheese.class,
"type",
getClass().getClassLoader() );
@@ -79,7 +85,8 @@
null,
new AlphaNodeFieldConstraint[]{constraint},
null,
- true );
+ true,
+ buildContext );
final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
from.addTupleSink( sink );
@@ -191,7 +198,8 @@
null,
new AlphaNodeFieldConstraint[0],
betaConstraints,
- true );
+ true,
+ buildContext );
final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
from.addTupleSink( sink );
@@ -286,7 +294,8 @@
null,
new AlphaNodeFieldConstraint[]{constraint},
null,
- true );
+ true,
+ buildContext );
final MockLeftTupleSink sink = new MockLeftTupleSink( 5 );
from.addTupleSink( sink );
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockLeftTupleSink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -26,6 +26,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.NodeMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockLeftTupleSink extends LeftTupleSource
@@ -43,11 +44,11 @@
private LeftTupleSinkNode nextTupleSinkNode;
public MockLeftTupleSink() {
- super( 0 );
+ super( 0, RuleBasePartitionId.MAIN_PARTITION, false );
}
public MockLeftTupleSink(final int id) {
- super( id );
+ super( id, RuleBasePartitionId.MAIN_PARTITION, false );
}
public void assertLeftTuple(final LeftTuple tuple,
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,9 +18,13 @@
import java.util.ArrayList;
import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockObjectSink
@@ -104,4 +108,16 @@
// TODO Auto-generated method stub
return 0;
}
+
+ public RuleBasePartitionId getPartitionId() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockObjectSource.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -19,6 +19,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
import java.io.IOException;
@@ -44,7 +45,7 @@
}
public MockObjectSource(final int id) {
- super( id );
+ super( id, RuleBasePartitionId.MAIN_PARTITION, false);
this.facts = new ArrayList();
}
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockRightTupleSink.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -2,8 +2,12 @@
import java.util.ArrayList;
import java.util.List;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockRightTupleSink
@@ -25,6 +29,17 @@
public int getId() {
return 0;
- }
+ }
+ public RuleBasePartitionId getPartitionId() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/MockTupleSource.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -18,6 +18,7 @@
import org.drools.common.BaseNode;
import org.drools.common.InternalWorkingMemory;
+import org.drools.common.RuleBasePartitionId;
import org.drools.spi.PropagationContext;
public class MockTupleSource extends LeftTupleSource {
@@ -32,7 +33,7 @@
private int updated;
public MockTupleSource(final int id) {
- super( id );
+ super( id, RuleBasePartitionId.MAIN_PARTITION, false );
}
public void attach() {
Added: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/PartitionTaskManagerTest.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.reteoo;
+
+import java.util.concurrent.TimeUnit;
+import java.io.ObjectOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+
+import junit.framework.TestCase;
+
+import org.drools.RuleBase;
+import org.drools.RuleBaseFactory;
+import org.drools.common.InternalWorkingMemory;
+
+/**
+ * Test case for PartitionTaskManager
+ *
+ * @author <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class PartitionTaskManagerTest extends TestCase {
+ private MockAction action;
+ private PartitionTaskManager manager;
+
+ @Override
+ public void setUp() {
+ RuleBase rulebase = RuleBaseFactory.newRuleBase();
+ InternalWorkingMemory workingMemory = (InternalWorkingMemory) rulebase.newStatefulSession();
+ action = new MockAction();
+ manager = new PartitionTaskManager( workingMemory );
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+
+ }
+
+ public void testStartStopService() throws InterruptedException {
+ assertFalse( manager.isRunning() );
+ manager.startService();
+ Thread.sleep( 1000 );
+ assertTrue( manager.isRunning() );
+ manager.stopService();
+ Thread.sleep( 1000 );
+ assertFalse( manager.isRunning() );
+ }
+
+ public void testNodeCallbacks() throws InterruptedException {
+ // should be possible to enqueue before starting the service,
+ // even if that should never happen
+ manager.enqueue( action );
+ manager.startService();
+ manager.enqueue( action );
+ // give the engine some time
+ Thread.sleep( 1000 );
+ assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
+ assertEquals( 2, action.getCallbackCounter() );
+ // should be possible to enqueue after the stop,
+ // but callback must not be executed
+ manager.enqueue( action );
+ manager.enqueue( action );
+ manager.enqueue( action );
+ // making sure the service is not processing the nodes
+ Thread.sleep( 1000 );
+ assertEquals( 2, action.getCallbackCounter() );
+ // restarting service
+ manager.startService();
+ // making sure the service had time to process the nodes
+ Thread.sleep( 1000 );
+ assertTrue( manager.stopService( 10, TimeUnit.SECONDS ) );
+ assertEquals( 5, action.getCallbackCounter() );
+ }
+
+ public static class MockAction implements PartitionTaskManager.Action {
+ private volatile long callbackCounter = 0;
+
+ public synchronized long getCallbackCounter() {
+ return this.callbackCounter;
+ }
+
+ public void execute( InternalWorkingMemory workingMemory ) {
+ synchronized( this ) {
+ callbackCounter++;
+ }
+ }
+
+ public void writeExternal( ObjectOutput out ) throws IOException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void readExternal( ObjectInput in ) throws IOException, ClassNotFoundException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
+}
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java 2008-08-24 15:44:02 UTC (rev 21842)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/QueryTerminalNodeTest.java 2008-08-24 16:00:53 UTC (rev 21843)
@@ -136,7 +136,8 @@
final QueryTerminalNode queryNode = new QueryTerminalNode( this.buildContext.getNextId(),
joinNode,
query,
- query.getLhs() );
+ query.getLhs(),
+ buildContext );
queryNode.attach();
More information about the jboss-svn-commits
mailing list