[jboss-svn-commits] JBL Code SVN: r22247 - in labs/jbossrules/trunk: drools-compiler/src/test/resources/org/drools/integrationtests and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Aug 29 19:48:59 EDT 2008


Author: tirelli
Date: 2008-08-29 19:48:59 -0400 (Fri, 29 Aug 2008)
New Revision: 22247

Added:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java
   labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl
Modified:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
Log:
Multi-threads for partitions are working

Added: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/RulebasePartitioningTest.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.integrationtests;
+
+import junit.framework.TestCase;
+import org.drools.compiler.PackageBuilder;
+import org.drools.rule.*;
+import org.drools.*;
+
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
+ */
+public class RulebasePartitioningTest extends TestCase {
+
+    protected RuleBase getRuleBase() throws Exception {
+
+        return RuleBaseFactory.newRuleBase( RuleBase.RETEOO,
+                                            null );
+    }
+
+    protected RuleBase getRuleBase(final RuleBaseConfiguration config) throws Exception {
+
+        return RuleBaseFactory.newRuleBase( RuleBase.RETEOO,
+                                            config );
+    }
+
+    public void testRulebasePartitions1() throws Exception {
+        final PackageBuilder builder = new PackageBuilder();
+        builder.addPackageFromDrl( new InputStreamReader( getClass().getResourceAsStream( "test_rulebasePartitions1.drl" ) ) );
+        final org.drools.rule.Package pkg = builder.getPackage();
+
+        RuleBaseConfiguration config = new RuleBaseConfiguration();
+        config.setPartitionsEnabled( true );
+
+        RuleBase ruleBase = getRuleBase( config );
+        ruleBase.addPackage( pkg );
+        ruleBase = SerializationHelper.serializeObject( ruleBase );
+        StatefulSession session = ruleBase.newStatefulSession();
+        List result = new ArrayList();
+        session.setGlobal( "results",
+                           result );
+
+        Cheese c1 = new Cheese( "stilton" );
+        Cheese c2 = new Cheese( "brie" );
+        Cheese c3 = new Cheese( "cheddar" );
+        Cheese c4 = new Cheese( "stilton" );
+        Person p1 = new Person( "bob" );
+        Person p2 = new Person( "mark" );
+        Person p3 = new Person( "michael" );
+        Person p4 = new Person( "bob" );
+        session.insert( c1 );
+        session.insert( c2 );
+        session.insert( c3 );
+        session.insert( c4 );
+        session.insert( p1 );
+        session.insert( p2 );
+        session.insert( p3 );
+        session.insert( p4 );
+
+        session = SerializationHelper.getSerialisedStatefulSession( session,
+                                                                    ruleBase );
+        result = (List) session.getGlobal( "results" );
+
+        session.fireAllRules();
+        assertEquals( 3,
+                      result.size() );
+        assertEquals( p4,
+                      result.get( 0 ) );
+        assertEquals( p1,
+                      result.get( 1 ) );
+        assertEquals( c3,
+                      result.get( 2 ) );
+
+        session.dispose();
+
+    }
+
+}

Added: labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl	                        (rev 0)
+++ labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_rulebasePartitions1.drl	2008-08-29 23:48:59 UTC (rev 22247)
@@ -0,0 +1,18 @@
+package org.drools;
+
+global java.util.List results;
+
+rule "rule 1"
+    salience 10
+when
+    $p : Person( name == "bob" )
+then
+    results.add( $p );
+end
+
+rule "rule 2"
+when
+    $c : Cheese( type == "cheddar" )
+then
+    results.add( $c );
+end
\ No newline at end of file

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractRuleBase.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -200,6 +200,7 @@
         droolsStream.writeObject( this.agendaGroupRuleTotals );
         droolsStream.writeUTF( this.factHandleFactory.getClass().getName() );
         droolsStream.writeObject( this.globals );
+        droolsStream.writeObject( this.partitionIDs );
 
         this.eventSupport.removeEventListener( RuleBaseEventListener.class );
         droolsStream.writeObject( this.eventSupport );
@@ -270,7 +271,8 @@
         
         this.populateTypeDeclarationMaps(); 
         
-        this.globals = (Map) droolsStream.readObject();                       
+        this.globals = (Map) droolsStream.readObject();
+        this.partitionIDs = (List<RuleBasePartitionId>) droolsStream.readObject();
         
         this.eventSupport = (RuleBaseEventSupport) droolsStream.readObject();
         this.eventSupport.setRuleBase( this );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -310,6 +310,28 @@
         }
     }
 
+
+    /**
+     *  This method is called to start the multiple partition threads when running in multi-thread
+     *  mode 
+     */
+    public void startPartitionManagers() {
+        if( this.ruleBase.getConfiguration().isPartitionsEnabled() ) {
+            for( PartitionTaskManager task : this.partitionManagers.values() ) {
+                task.startService();
+            }
+        }
+    }
+
+    public void stopPartitionManagers() {
+        if(this.ruleBase.getConfiguration().isPartitionsEnabled()) {
+            for( PartitionTaskManager task : this.partitionManagers.values() ) {
+                // what to do here? should we simply wait for a timeout and give up?
+                task.stopServiceAndWait();
+            }
+        }
+    }
+
     private void initTransient() {
         this.entryPointNode = this.ruleBase.getRete().getEntryPointNode( this.entryPoint );
         this.typeConfReg = new ObjectTypeConfigurationRegistry( this.ruleBase );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/RuleBasePartitionId.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -66,4 +66,8 @@
         } else if ( !id.equals( other.id ) ) return false;
         return true;
     }
+
+    public String toString() {
+        return "Partition::"+this.id;    
+    }
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AsyncSingleObjectSinkAdapter.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -28,12 +28,17 @@
  * @author: <a href="mailto:tirelli at post.com">Edson Tirelli</a>
  */
 public class AsyncSingleObjectSinkAdapter extends SingleObjectSinkAdapter {
+
+    public AsyncSingleObjectSinkAdapter() {
+        super();
+    }
+
     public AsyncSingleObjectSinkAdapter( RuleBasePartitionId partitionId, ObjectSink objectSink ) {
         super( partitionId, objectSink );
     }
 
     public void propagateAssertObject( InternalFactHandle factHandle, PropagationContext context, InternalWorkingMemory workingMemory ) {
-        PartitionTaskManager manager = workingMemory.getPartitionManager( this.partitionId );
+        PartitionTaskManager manager = workingMemory.getPartitionManager( this.sink.getPartitionId() );
         manager.enqueue( new PartitionTaskManager.FactAssertAction(factHandle, context, this.sink ) );
     }
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -246,6 +246,8 @@
                 }
             }
 
+            session.startPartitionManagers();
+
             session.queueWorkingMemoryAction( new WorkingMemoryReteAssertAction( session.getInitialFactHandle(),
                                                                                  false,
                                                                                  true,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -117,6 +117,7 @@
         for ( Iterator it = this.__ruleBaseEventListeners.iterator(); it.hasNext(); ) {
             this.ruleBase.removeEventListener( (RuleBaseEventListener) it.next() );
         }
+        this.stopPartitionManagers();
         this.executor.shutDown();
     }
 

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -113,7 +113,7 @@
         
         this.nodes = new LinkedList<BaseNode>();
 
-        this.partitionId = RuleBasePartitionId.MAIN_PARTITION;
+        this.partitionId = null;
     }
 
     /**

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildUtils.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -90,15 +90,15 @@
     public BaseNode attachNode(final BuildContext context,
                                final BaseNode candidate) {
         BaseNode node = null;
+        RuleBasePartitionId partition = null;
         if( candidate instanceof EntryPointNode ) {
             // entry point nodes are always shared
             EntryPointNode epn = context.getRuleBase().getRete().getEntryPointNode( ((EntryPointNode)candidate).getEntryPoint() );
             if( epn != null ) {
                 node = epn;
-            } else {
-                // all EntryPointNodes belong to the main partition
-                candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
             }
+            // all EntryPointNodes belong to the main partition
+            partition = RuleBasePartitionId.MAIN_PARTITION;
         } else if( candidate instanceof ObjectTypeNode ) {
             // object type nodes are always shared
             ObjectTypeNode otn = (ObjectTypeNode) candidate;
@@ -107,14 +107,10 @@
                 otn = map.get( otn.getObjectType() );
                 if ( otn != null ) {
                     node = otn;
-                } else {
-                    // all OTNs belong to the main partition
-                    candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
                 }
-            } else {
-                // all OTNs belong to the main partition
-                candidate.setPartitionId( RuleBasePartitionId.MAIN_PARTITION );
             }
+            // all EntryPointNodes belong to the main partition
+            partition = RuleBasePartitionId.MAIN_PARTITION;
         } else if( isSharingEnabledForNode( context, candidate ) ) {
             if ( (context.getTupleSource() != null) && ( candidate instanceof LeftTupleSink ) ) {
                 node    = context.getTupleSource().getSinkPropagator().getMatchingNode(candidate);
@@ -125,19 +121,8 @@
             }
             if( node != null ) {
                 // shared node found
-                
                 // undo previous id assignment
                 context.releaseId( candidate.getId() );
-
-                // if partitions are enabled
-                if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
-                    // check what partition it belongs to
-                    if( context.getPartitionId() == null ) {
-                        context.setPartitionId( node.getPartitionId() );
-                    } else if( ! context.getPartitionId().equals( node.getPartitionId() ) ) {
-                        assert false : "Needs to implement support for partitions merge";
-                    }
-                }
             }
         }
 
@@ -145,6 +130,19 @@
         if ( node == null ) {
             // only attach() if it is a new node
             node = candidate;
+
+            // new node, so it must be labeled
+            if( partition == null ) {
+                // if it does not has a predefined label
+                if( context.getPartitionId() == null ) {
+                    // if no label in current context, create one
+                    context.setPartitionId( context.getRuleBase().createNewPartitionId() );
+                }
+                partition = context.getPartitionId();
+            }
+            // set node whit the actual partition label
+            node.setPartitionId( partition );
+
             if ( context.getWorkingMemories().length == 0 ) {
                 node.attach();
             } else {

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java	2008-08-29 18:15:09 UTC (rev 22246)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/ReteooRuleBuilder.java	2008-08-29 23:48:59 UTC (rev 22247)
@@ -161,33 +161,33 @@
         context.getNodes().add((BaseNode) terminal );
 
         // assigns partition IDs to the new nodes
-        assignPartitionId(context);
+        //assignPartitionId(context);
 
         return terminal;
     }
 
-    /**
-     * Assigns the current partition ID to the list of created nodes
-     * 
-     * @param context
-     */
-    private void assignPartitionId(BuildContext context) {
-        if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
-            org.drools.common.RuleBasePartitionId partitionId = null;
-            if( context.getPartitionId() != null ) {
-                // it means it shares nodes with an existing partition, so
-                // assign the first id to the newly added nodes
-                partitionId = context.getPartitionId();
-            } else {
-                // nodes are independent of existing nodes, so create a new
-                // partition ID for them
-                partitionId = context.getRuleBase().createNewPartitionId();
-            }
-            for( BaseNode node : context.getNodes() ) {
-                node.setPartitionId( partitionId );
-            }
-        }
-    }
+//    /**
+//     * Assigns the current partition ID to the list of created nodes
+//     *
+//     * @param context
+//     */
+//    private void assignPartitionId(BuildContext context) {
+//        if( context.getRuleBase().getConfiguration().isPartitionsEnabled() ) {
+//            org.drools.common.RuleBasePartitionId partitionId = null;
+//            if( context.getPartitionId() != null ) {
+//                // it means it shares nodes with an existing partition, so
+//                // assign the first id to the newly added nodes
+//                partitionId = context.getPartitionId();
+//            } else {
+//                // nodes are independent of existing nodes, so create a new
+//                // partition ID for them
+//                partitionId = context.getRuleBase().createNewPartitionId();
+//            }
+//            for( BaseNode node : context.getNodes() ) {
+//                node.setPartitionId( partitionId );
+//            }
+//        }
+//    }
 
     /**
      * Adds a query pattern to the given subrule




More information about the jboss-svn-commits mailing list