[jboss-svn-commits] JBL Code SVN: r22515 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 8 10:20:55 EDT 2008


Author: tirelli
Date: 2008-09-08 10:20:54 -0400 (Mon, 08 Sep 2008)
New Revision: 22515

Modified:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java
   labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java
   labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java
Log:
Adding fireUntilHalt() feature

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -190,7 +190,6 @@
 
     }
 
-    //FIXME current parser does not support declare statement
     public void testTimeRelationalOperators() throws Exception {
         // read in the source
         final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_CEP_TimeRelationalOperators.drl" ) );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -187,26 +187,12 @@
     Iterator iterateFactHandles(ObjectFilter filter);
 
     /**
-     * Returns the AgendaGroup which has the current WorkingMemory focus. The AgendaGroup interface is subject to change.
-     * @return
-     *     the AgendaGroup
-     */
-    public AgendaGroup getFocus();
-
-    /**
      * Set the focus to the specified AgendaGroup
      * @param focus
      */
     void setFocus(String focus);
 
     /**
-     * Set the focus to the specified AgendaGroup
-     * @param focus
-     */
-    void setFocus(AgendaGroup focus);
-
-
-    /**
      * Retrieve the QueryResults of the specified query.
      *
      * @param query

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -31,6 +31,7 @@
 import java.util.Queue;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -80,7 +81,6 @@
 import org.drools.rule.TimeMachine;
 import org.drools.spi.Activation;
 import org.drools.spi.AgendaFilter;
-import org.drools.spi.AgendaGroup;
 import org.drools.spi.AsyncExceptionHandler;
 import org.drools.spi.FactHandleFactory;
 import org.drools.spi.GlobalResolver;
@@ -142,7 +142,7 @@
     protected TruthMaintenanceSystem                    tms;
 
     /** Rule-firing agenda. */
-    protected DefaultAgenda                             agenda;
+    protected InternalAgenda                            agenda;
 
     protected Queue<WorkingMemoryAction>                actionQueue;
 
@@ -164,10 +164,8 @@
     private List                                        liaPropagations;
 
     /** Flag to determine if a rule is currently being fired. */
-    protected volatile boolean                          firing;
+    protected volatile AtomicBoolean                    firing;
 
-    protected volatile boolean                          halt;
-    
     private ProcessInstanceManager                      processInstanceManager;
 
     private WorkItemManager                             workItemManager;
@@ -280,6 +278,7 @@
                               this );
 
         this.entryPoint = EntryPoint.DEFAULT;
+        this.firing = new AtomicBoolean( false );
 
         initPartitionManagers();
         initTransient();
@@ -512,7 +511,7 @@
     }
 
     public void halt() {
-        this.halt = true;
+        this.agenda.halt();
     }
     
 //    /**
@@ -560,99 +559,62 @@
         // the firing for any other assertObject(..) that get
         // nested inside, avoiding concurrent-modification
         // exceptions, depending on code paths of the actions.
-        this.halt = false;
-
         if ( isSequential() ) {
             for ( Iterator it = this.liaPropagations.iterator(); it.hasNext(); ) {
                 ((LIANodePropagation) it.next()).doPropagation( this );
             }
         }
 
+        // do we need to call this in advance?
         executeQueuedActions();
 
-        boolean noneFired = true;
-
-        if ( !this.firing ) {
-            try {
-                this.firing = true;
-
-                while ( continueFiring( fireLimit ) && this.agenda.fireNextItem( agendaFilter ) ) {
-                    fireLimit = updateFireLimit( fireLimit );
-                    noneFired = false;
-                    executeQueuedActions();
-                }
-            } finally {
-                this.firing = false;
-                // @todo (mproctor) disabling Otherwise management for now, not
-                // happy with the current implementation
-                // if ( noneFired ) {
-                // doOtherwise( agendaFilter,
-                // fireLimit );
-                // }
-
+        try {
+            if( this.firing.compareAndSet( false, true ) ) {
+                this.agenda.fireAllRules( agendaFilter, fireLimit );
             }
+        } finally {
+            this.firing.set( false );
         }
     }
 
-    private final boolean continueFiring(final int fireLimit) {
-        return (!halt) && (fireLimit != 0);
+    /**
+     * Keeps firing activations until a halt is called. If in a given moment, there is 
+     * no activation to fire, it will wait for an activation to be added to an active 
+     * agenda group or rule flow group.
+     * 
+     * @throws IllegalStateException if this method is called when running in sequential mode
+     */
+    public synchronized void fireUntilHalt() {
+        fireUntilHalt( null );
     }
-
-    private final int updateFireLimit(final int fireLimit) {
-        return fireLimit > 0 ? fireLimit - 1 : fireLimit;
-    }
-
+    
     /**
-     * This does the "otherwise" phase of processing. If no items are fired,
-     * then it will assert a temporary "Otherwise" fact and allow any rules to
-     * fire to handle "otherwise" cases.
+     * Keeps firing activations until a halt is called. If in a given moment, there is 
+     * no activation to fire, it will wait for an activation to be added to an active 
+     * agenda group or rule flow group.
+     * 
+     * @param agendaFilter filters the activations that may fire
+     * 
+     * @throws IllegalStateException if this method is called when running in sequential mode
      */
-    private void doOtherwise(final AgendaFilter agendaFilter,
-                             int fireLimit) {
-        final FactHandle handle = this.insert( new Otherwise() );
-        if ( !this.actionQueue.isEmpty() ) {
-            executeQueuedActions();
+    public synchronized void fireUntilHalt(final AgendaFilter agendaFilter) {
+        if( isSequential() ) {
+            throw new IllegalStateException("fireUntilHalt() can not be called in sequential mode.");
         }
-
-        while ( continueFiring( fireLimit ) && this.agenda.fireNextItem( agendaFilter ) ) {
-            fireLimit = updateFireLimit( fireLimit );
+        
+        executeQueuedActions();
+        try {
+            if( this.firing.compareAndSet( false, true ) ) {
+                this.agenda.fireUntilHalt( agendaFilter );
+            }
+        } finally {
+            this.firing.set( false );
         }
-
-        this.retract( handle );
     }
 
-    //
-    // MN: The following is the traditional fireAllRules (without otherwise).
-    // Purely kept here as this implementation of otherwise is still
-    // experimental.
-    //
-    // public synchronized void fireAllRules(final AgendaFilter agendaFilter)
-    // throws FactException {
-    // // If we're already firing a rule, then it'll pick up
-    // // the firing for any other assertObject(..) that get
-    // // nested inside, avoiding concurrent-modification
-    // // exceptions, depending on code paths of the actions.
-    //
-    // if ( !this.factQueue.isEmpty() ) {
-    // propagateQueuedActions();
-    // }
-    //
-    // if ( !this.firing ) {
-    // try {
-    // this.firing = true;
-    //
-    // while ( this.agenda.fireNextItem( agendaFilter ) ) {
-    // ;
-    // }
-    // } finally {
-    // this.firing = false;
-    // }
-    // }
-    // }
-
     /**
      * Returns the fact Object for the given <code>FactHandle</code>. It
-     * actually attemps to return the value from the handle, before retrieving
+     * actually attempts to return the value from the handle, before retrieving
      * it from objects map.
      * 
      * @see WorkingMemory
@@ -718,18 +680,10 @@
 
     public abstract QueryResults getQueryResults(String query);
 
-    public AgendaGroup getFocus() {
-        return this.agenda.getFocus();
-    }
-
     public void setFocus(final String focus) {
         this.agenda.setFocus( focus );
     }
 
-    public void setFocus(final AgendaGroup focus) {
-        this.agenda.setFocus( focus );
-    }
-
     public TruthMaintenanceSystem getTruthMaintenanceSystem() {
         return this.tms;
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -27,6 +27,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.drools.WorkingMemory;
 import org.drools.base.DefaultKnowledgeHelper;
@@ -103,6 +104,8 @@
 
     private ConsequenceExceptionHandler      consequenceExceptionHandler;
 
+    protected volatile AtomicBoolean         halt = new AtomicBoolean(false);
+    
     // ------------------------------------------------------------
     // Constructors
     // ------------------------------------------------------------
@@ -268,7 +271,6 @@
         Scheduler.getInstance().scheduleAgendaItem( item,
                                                     this );
 
-
         if ( this.scheduledActivations == null ) {
             this.scheduledActivations = new org.drools.util.LinkedList();
         }
@@ -378,6 +380,11 @@
             rfg.addActivation( activation );
 
         }
+        
+        // making sure we re-evaluate agenda in case we are waiting for activations
+        synchronized( this.halt ) {
+            this.halt.notifyAll();
+        }
 
     }
 
@@ -456,14 +463,14 @@
      */
     public AgendaGroup getNextFocus() {
         InternalAgendaGroup agendaGroup = null;
-        // Iterate untill we find a populate AgendaModule or we reach the MAIN,
+        // Iterate until we find a populate AgendaModule or we reach the MAIN,
         // default, AgendaGroup
         while ( true ) {
             agendaGroup = (InternalAgendaGroup) this.focusStack.getLast();
 
             final boolean empty = agendaGroup.isEmpty();
 
-            // No populated queus found so pop the focusStack and repeat
+            // No populated queues found so pop the focusStack and repeat
             if ( empty && (this.focusStack.size() > 1) ) {
                 agendaGroup.setActive( false );
                 this.focusStack.removeLast();
@@ -645,18 +652,18 @@
     public org.drools.util.LinkedList getScheduledActivationsLinkedList() {
         return this.scheduledActivations;
     }
-    
+
     public void clear() {
         // reset focus stack
         this.focusStack.clear();
         this.focusStack.add( this.main );
-        
+
         // reset scheduled activations
         if ( this.scheduledActivations != null && !this.scheduledActivations.isEmpty() ) {
             for ( ScheduledAgendaItem item = (ScheduledAgendaItem) this.scheduledActivations.removeFirst(); item != null; item = (ScheduledAgendaItem) this.scheduledActivations.removeFirst() ) {
                 Scheduler.getInstance().removeAgendaItem( item );
             }
-        }            
+        }
 
         //reset all agenda groups
         for ( InternalAgendaGroup group : this.agendaGroups.values() ) {
@@ -672,7 +679,7 @@
         for ( ActivationGroup group : this.activationGroups.values() ) {
             group.clear();
         }
-    }    
+    }
 
     /** (non-Javadoc)
      * @see org.drools.common.AgendaI#clearAgenda()
@@ -934,10 +941,10 @@
      */
     public boolean isRuleActiveInRuleFlowGroup(String ruleflowGroupName,
                                                String ruleName) {
-        
+
         RuleFlowGroup systemRuleFlowGroup = this.getRuleFlowGroup( ruleflowGroupName );
 
-        for ( Iterator< RuleFlowGroupNode > activations = systemRuleFlowGroup.iterator(); activations.hasNext(); ) {
+        for ( Iterator<RuleFlowGroupNode> activations = systemRuleFlowGroup.iterator(); activations.hasNext(); ) {
             Activation activation = activations.next().getActivation();
             if ( ruleName.equals( activation.getRule().getName() ) ) {
                 return true;
@@ -947,7 +954,7 @@
     }
 
     public void addRuleFlowGroupListener(String ruleFlowGroup,
-                                         RuleFlowGroupListener listener ) {
+                                         RuleFlowGroupListener listener) {
         InternalRuleFlowGroup rfg = (InternalRuleFlowGroup) this.getRuleFlowGroup( ruleFlowGroup );
         rfg.addRuleFlowGroupListener( listener );
     }
@@ -962,4 +969,51 @@
         return this.getFocus().getName();
     }
 
+    public void fireUntilHalt() {
+        fireUntilHalt( null );
+    }
+
+    public void fireUntilHalt( final AgendaFilter agendaFilter ) {
+            this.halt.set( false );
+            while ( continueFiring( -1 ) ) {
+                boolean fired = fireNextItem( agendaFilter );
+                if( !fired ) {
+                    try {
+                        synchronized( this.halt ) {
+                            this.halt.wait();
+                        }
+                    } catch ( InterruptedException e ) {
+                        this.halt.set( true );
+                    }
+                } else {
+                    this.workingMemory.executeQueuedActions();
+                }
+            }
+    }
+    
+    public void fireAllRules(AgendaFilter agendaFilter,
+                             int fireLimit) {
+        this.halt.set( false );
+        while ( continueFiring( fireLimit ) && fireNextItem( agendaFilter ) ) {
+            fireLimit = updateFireLimit( fireLimit );
+            this.workingMemory.executeQueuedActions();
+        }
+    }
+
+    private final boolean continueFiring(final int fireLimit) {
+        return (!halt.get()) && (fireLimit != 0);
+    }
+
+    private final int updateFireLimit(final int fireLimit) {
+        return fireLimit > 0 ? fireLimit - 1 : fireLimit;
+    }
+
+    public void halt() {
+        this.halt.set( true );
+        synchronized( this.halt ) {
+            this.halt.notifyAll();
+        }
+    }
+
+    
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -72,4 +72,42 @@
     public void removeRuleFlowGroupListener(String ruleFlowGroup,
                                             RuleFlowGroupListener listener);
 
+    public void clear();
+
+    public void setWorkingMemory(final InternalWorkingMemory workingMemory);
+
+    /**
+     * Fires all activations currently in agenda that match the given agendaFilter
+     * until the fireLimit is reached or no more activations exist.
+     * 
+     * @param agendaFilter the filter on which activations may fire.
+     * @param fireLimit the maximum number of activations that may fire. If -1, then it will
+     *                  fire until no more activations exist.
+     */
+    public void fireAllRules(AgendaFilter agendaFilter,
+                             int fireLimit);
+
+    /**
+     * Stop agenda from firing any other rule. It will finish the current rule
+     * execution though.
+     */
+    public void halt();
+
+    /**
+     * Keeps firing activations until a halt is called. If in a given moment, there is 
+     * no activation to fire, it will wait for an activation to be added to an active 
+     * agenda group or rule flow group.
+     */
+    public void fireUntilHalt();
+    
+    /**
+     * Keeps firing activations until a halt is called. If in a given moment, there is 
+     * no activation to fire, it will wait for an activation to be added to an active 
+     * agenda group or rule flow group.
+     * 
+     * @param agendaFilter filters the activations that may fire
+     */
+    public void fireUntilHalt(AgendaFilter agendaFilter);
+    
+    
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -78,7 +78,7 @@
      * @param workingMemory
      *            The working memory session.
      */
-    void scheduleAgendaItem(final ScheduledAgendaItem item, DefaultAgenda agenda) {
+    void scheduleAgendaItem(final ScheduledAgendaItem item, InternalAgenda agenda) {
         DuractionJob job = new DuractionJob();        
         DuractionJobContext ctx = new DuractionJobContext( item, agenda );
         TimerTrigger trigger = new TimerTrigger( item.getRule().getDuration().getDuration( item.getTuple() ), 0);
@@ -94,7 +94,7 @@
     
     public static class DuractionJob implements Job {
         public void execute(JobContext ctx) {
-            DefaultAgenda agenda = ( DefaultAgenda ) ((DuractionJobContext)ctx).getAgenda();
+            InternalAgenda agenda = ( InternalAgenda ) ((DuractionJobContext)ctx).getAgenda();
             ScheduledAgendaItem item  = ((DuractionJobContext)ctx).getScheduledAgendaItem();
             
             agenda.fireActivation( item );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -8,7 +8,7 @@
 import org.drools.FactHandle;
 import org.drools.SessionConfiguration;
 import org.drools.StatefulSession;
-import org.drools.common.DefaultAgenda;
+import org.drools.common.InternalAgenda;
 import org.drools.common.InternalRuleBase;
 import org.drools.concurrent.AssertObject;
 import org.drools.concurrent.AssertObjects;
@@ -58,7 +58,7 @@
                                  final InitialFactHandle initialFactHandle,
                                  final long propagationContext,
                                  final SessionConfiguration config,
-                                 final DefaultAgenda agenda) {
+                                 final InternalAgenda agenda) {
         super( id,
                ruleBase,
                handleFactory,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -28,6 +28,7 @@
 import org.drools.base.DroolsQuery;
 import org.drools.common.AbstractWorkingMemory;
 import org.drools.common.DefaultAgenda;
+import org.drools.common.InternalAgenda;
 import org.drools.common.InternalFactHandle;
 import org.drools.common.InternalRuleBase;
 import org.drools.common.InternalWorkingMemory;
@@ -86,7 +87,7 @@
                                final InitialFactHandle initialFactHandle,
                                final long propagationContext,
                                final SessionConfiguration config,
-                               final DefaultAgenda agenda) {
+                               final InternalAgenda agenda) {
         super( id,
                ruleBase,
                handleFactory,

Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -24,7 +24,7 @@
  * @author Michael Neale
  */
 public class OtherwiseTest extends TestCase {
-
+    
     public void testOneRuleFiringNoOtherwise() throws Exception {
         final RuleBase ruleBase = RuleBaseFactory.newRuleBase( RuleBase.RETEOO );
 

Modified: labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java
===================================================================
--- labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java	2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java	2008-09-08 14:20:54 UTC (rev 22515)
@@ -13,6 +13,9 @@
 	public void testBasics() throws Exception {
 		PackageBuilder pb = new PackageBuilder();
 		pb.addPackageFromDrl(new InputStreamReader(getClass().getResourceAsStream("/TeamAllocation.drl")));
+		if( pb.hasErrors() ) {
+		    System.out.println(pb.getErrors().toString());
+		}
 		assertFalse(pb.hasErrors());
 
 		RuleBase rb = RuleBaseFactory.newRuleBase();




More information about the jboss-svn-commits mailing list