[jboss-svn-commits] JBL Code SVN: r22247 - in labs/jbossrules/trunk: drools-compiler/src/test/resources/org/drools/integrationtests and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Aug 29 19:48:59 EDT 2008
Author: tirelli
Date: 2008-08-29 19:48:59 -0400 (Fri, 29 Aug 2008)
New Revision: 22247
Added:
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java
labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl
Modified:
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
Log:
Multi-threads for partitions are working
Added: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java (rev 0)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.integrationtests;
+
+import junit.framework.TestCase;
+import org.drools.compiler.PackageBuilder;
+import org.drools.rule.*;
+import org.drools.*;
+
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class RulebasePartitioningTest extends TestCase {
+
+ protected RuleBase getRuleBase() throws Exception {
+
+ return RuleBaseFactory.newRuleBase( RuleBase.RETEOO,
+ null );
+ }
+
+ protected RuleBase getRuleBase(final RuleBaseConfiguration config) throws Exception {
+
+ return RuleBaseFactory.newRuleBase( RuleBase.RETEOO,
+ config );
+ }
+
+ public void testRulebasePartitions1() throws Exception {
+ final PackageBuilder builder = new PackageBuilder();
+ builder.addPackageFromDrl( new InputStreamReader( getClass().getResourceAsStream( "test_rulebasePartitions1.drl" ) ) );
+ final org.drools.rule.Package pkg = builder.getPackage();
+
+ RuleBaseConfiguration config = new RuleBaseConfiguration();
+ config.setPartitionsEnabled( true );
+
+ RuleBase ruleBase = getRuleBase( config );
+ ruleBase.addPackage( pkg );
+ ruleBase = SerializationHelper.serializeObject( ruleBase );
+ StatefulSession session = ruleBase.newStatefulSession();
+ List result = new ArrayList();
+ session.setGlobal( "results",
+ result );
+
+ Cheese c1 = new Cheese( "stilton" );
+ Cheese c2 = new Cheese( "brie" );
+ Cheese c3 = new Cheese( "cheddar" );
+ Cheese c4 = new Cheese( "stilton" );
+ Person p1 = new Person( "bob" );
+ Person p2 = new Person( "mark" );
+ Person p3 = new Person( "michael" );
+ Person p4 = new Person( "bob" );
+ session.insert( c1 );
+ session.insert( c2 );
+ session.insert( c3 );
+ session.insert( c4 );
+ session.insert( p1 );
+ session.insert( p2 );
+ session.insert( p3 );
+ session.insert( p4 );
+
+ session = SerializationHelper.getSerialisedStatefulSession( session,
+ ruleBase );
+ result = (List) session.getGlobal( "results" );
+
+ session.fireAllRules();
+ assertEquals( 3,
+ result.size() );
+ assertEquals( p4,
+ result.get( 0 ) );
+ assertEquals( p1,
+ result.get( 1 ) );
+ assertEquals( c3,
+ result.get( 2 ) );
+
+ session.dispose();
+
+ }
+
+}
Added: labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl (rev 0)
+++ labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl 2008-08-29 23:48:59 UTC (rev 22247)
@@ -0,0 +1,18 @@
+package org.drools;
+
+global java.util.List results;
+
+rule "rule 1"
+ salience 10
+when
+ $p : Person( name == "bob" )
+then
+ results.add( $p );
+end
+
+rule "rule 2"
+when
+ $c : Cheese( type == "cheddar" )
+then
+ results.add( $c );
+end
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -200,6 +200,7 @@
droolsStream.writeObject( this.agendaGroupRuleTotals );
droolsStream.writeUTF( this.factHandleFactory.getClass().getName() );
droolsStream.writeObject( this.globals );
+ droolsStream.writeObject( this.partitionIDs );
this.eventSupport.removeEventListener( RuleBaseEventListener.class );
droolsStream.writeObject( this.eventSupport );
@@ -270,7 +271,8 @@
this.populateTypeDeclarationMaps();
- this.globals = (Map) droolsStream.readObject();
+ this.globals = (Map) droolsStream.readObject();
+ this.partitionIDs = (List<RuleBasePartitionId>) droolsStream.readObject();
this.eventSupport = (RuleBaseEventSupport) droolsStream.readObject();
this.eventSupport.setRuleBase( this );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -310,6 +310,28 @@
}
}
+
+ /**
+ * This method is called to start the multiple partition threads when running in multi-thread
+ * mode
+ */
+ public void startPartitionManagers() {
+ if( this.ruleBase.getConfiguration().isPartitionsEnabled() ) {
+ for( PartitionTaskManager task : this.partitionManagers.values() ) {
+ task.startService();
+ }
+ }
+ }
+
+ public void stopPartitionManagers() {
+ if(this.ruleBase.getConfiguration().isPartitionsEnabled()) {
+ for( PartitionTaskManager task : this.partitionManagers.values() ) {
+ // what to do here? should we simply wait for a timeout and give up?
+ task.stopServiceAndWait();
+ }
+ }
+ }
+
private void initTransient() {
this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );
this.typeConfReg = new ObjectTypeConfigurationRegistry( this.ruleBase );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -66,4 +66,8 @@
} else if ( !id.equals( other.id ) ) return false;
return true;
}
+
+ public String toString() {
+ return "Partition::"+this.id;
+ }
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -28,12 +28,17 @@
* @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
*/
public class AsyncSingleObjectSinkAdapter extends SingleObjectSinkAdapter {
+
+ public AsyncSingleObjectSinkAdapter() {
+ super();
+ }
+
public AsyncSingleObjectSinkAdapter( RuleBasePartitionId partitionId, ObjectSink objectSink ) {
super( partitionId, objectSink );
}
public void propagateAssertObject( InternalFactHandle factHandle, PropagationContext context, InternalWorkingMemory workingMemory ) {
- PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+ PartitionTaskManager manager = workingMemory.getPartitionManager( this.sink.getPartitionId() );
manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, this.sink ) );
}
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -246,6 +246,8 @@
}
}
+ session.startPartitionManagers();
+
session.queueWorkingMemoryAction( new WorkingMemoryReteAssertAction( session.getInitialFactHandle(),
false,
true,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -117,6 +117,7 @@
for ( Iterator it = this.__ruleBaseEventListeners.iterator(); it.hasNext(); ) {
this.ruleBase.removeEventListener( (RuleBaseEventListener) it.next() );
}
+ this.stopPartitionManagers();
this.executor.shutDown();
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -113,7 +113,7 @@
this.nodes = new LinkedList<BaseNode>();
- this.partitionId = RuleBasePartitionId.MAIN_PARTITION;
+ this.partitionId = null;
}
/**
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -90,15 +90,15 @@
public BaseNode attachNode(final BuildContext context,
final BaseNode candidate) {
BaseNode node = null;
+ RuleBasePartitionId partition = null;
if( candidate instanceof EntryPointNode ) {
// entry point nodes are always shared
EntryPointNode epn = context.getRuleBase().getRete().getEntryPointNode( ((EntryPointNode)candidate).getEntryPoint() );
if( epn != null ) {
node = epn;
- } else {
- // all EntryPointNodes belong to the main partition
- candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
}
+ // all EntryPointNodes belong to the main partition
+ partition = RuleBasePartitionId.MAIN_PARTITION;
} else if( candidate instanceof ObjectTypeNode ) {
// object type nodes are always shared
ObjectTypeNode otn = (ObjectTypeNode) candidate;
@@ -107,14 +107,10 @@
otn = map.get( otn.getObjectType() );
if ( otn != null ) {
node = otn;
- } else {
- // all OTNs belong to the main partition
- candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
}
- } else {
- // all OTNs belong to the main partition
- candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
}
+ // all EntryPointNodes belong to the main partition
+ partition = RuleBasePartitionId.MAIN_PARTITION;
} else if( isSharingEnabledForNode( context, candidate ) ) {
if ( (context.getTupleSource() != null) && ( candidate instanceof LeftTupleSink ) ) {
node = context.getTupleSource().getSinkPropagator().getMatchingNode(candidate);
@@ -125,19 +121,8 @@
}
if( node != null ) {
// shared node found
-
// undo previous id assignment
context.releaseId( candidate.getId() );
-
- // if partitions are enabled
- if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
- // check what partition it belongs to
- if( context.getPartitionId() == null ) {
- context.setPartitionId( node.getPartitionId() );
- } else if( ! context.getPartitionId().equals( node.getPartitionId() ) ) {
- assert false : "Needs to implement support for partitions merge";
- }
- }
}
}
@@ -145,6 +130,19 @@
if ( node == null ) {
// only attach() if it is a new node
node = candidate;
+
+ // new node, so it must be labeled
+ if( partition == null ) {
+ // if it does not has a predefined label
+ if( context.getPartitionId() == null ) {
+ // if no label in current context, create one
+ context.setPartitionId( context.getRuleBase().createNewPartitionId() );
+ }
+ partition = context.getPartitionId();
+ }
+ // set node whit the actual partition label
+ node.setPartitionId( partition );
+
if ( context.getWorkingMemories().length == 0 ) {
node.attach();
} else {
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java 2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java 2008-08-29 23:48:59 UTC (rev 22247)
@@ -161,33 +161,33 @@
context.getNodes().add((BaseNode) terminal );
// assigns partition IDs to the new nodes
- assignPartitionId(context);
+ //assignPartitionId(context);
return terminal;
}
- /**
- * Assigns the current partition ID to the list of created nodes
- *
- * @param context
- */
- private void assignPartitionId(BuildContext context) {
- if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
- org.drools.common.RuleBasePartitionId partitionId = null;
- if( context.getPartitionId() != null ) {
- // it means it shares nodes with an existing partition, so
- // assign the first id to the newly added nodes
- partitionId = context.getPartitionId();
- } else {
- // nodes are independent of existing nodes, so create a new
- // partition ID for them
- partitionId = context.getRuleBase().createNewPartitionId();
- }
- for( BaseNode node : context.getNodes() ) {
- node.setPartitionId( partitionId );
- }
- }
- }
+// /**
+// * Assigns the current partition ID to the list of created nodes
+// *
+// * @param context
+// */
+// private void assignPartitionId(BuildContext context) {
+// if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
+// org.drools.common.RuleBasePartitionId partitionId = null;
+// if( context.getPartitionId() != null ) {
+// // it means it shares nodes with an existing partition, so
+// // assign the first id to the newly added nodes
+// partitionId = context.getPartitionId();
+// } else {
+// // nodes are independent of existing nodes, so create a new
+// // partition ID for them
+// partitionId = context.getRuleBase().createNewPartitionId();
+// }
+// for( BaseNode node : context.getNodes() ) {
+// node.setPartitionId( partitionId );
+// }
+// }
+// }
/**
* Adds a query pattern to the given subrule
More information about the jboss-svn-commits
mailing list