[jboss-svn-commits] JBL Code SVN: r22515 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Sep 8 10:20:55 EDT 2008
Author: tirelli
Date: 2008-09-08 10:20:54 -0400 (Mon, 08 Sep 2008)
New Revision: 22515
Modified:
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java
labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java
labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java
Log:
Adding fireUntilHalt() feature
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -190,7 +190,6 @@
}
- //FIXME current parser does not support declare statement
public void testTimeRelationalOperators() throws Exception {
// read in the source
final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_CEP_TimeRelationalOperators.drl" ) );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/WorkingMemory.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -187,26 +187,12 @@
Iterator iterateFactHandles(ObjectFilter filter);
/**
- * Returns the AgendaGroup which has the current WorkingMemory focus. The AgendaGroup interface is subject to change.
- * @return
- * the AgendaGroup
- */
- public AgendaGroup getFocus();
-
- /**
* Set the focus to the specified AgendaGroup
* @param focus
*/
void setFocus(String focus);
/**
- * Set the focus to the specified AgendaGroup
- * @param focus
- */
- void setFocus(AgendaGroup focus);
-
-
- /**
* Retrieve the QueryResults of the specified query.
*
* @param query
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/AbstractWorkingMemory.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -31,6 +31,7 @@
import java.util.Queue;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -80,7 +81,6 @@
import org.drools.rule.TimeMachine;
import org.drools.spi.Activation;
import org.drools.spi.AgendaFilter;
-import org.drools.spi.AgendaGroup;
import org.drools.spi.AsyncExceptionHandler;
import org.drools.spi.FactHandleFactory;
import org.drools.spi.GlobalResolver;
@@ -142,7 +142,7 @@
protected TruthMaintenanceSystem tms;
/** Rule-firing agenda. */
- protected DefaultAgenda agenda;
+ protected InternalAgenda agenda;
protected Queue<WorkingMemoryAction> actionQueue;
@@ -164,10 +164,8 @@
private List liaPropagations;
/** Flag to determine if a rule is currently being fired. */
- protected volatile boolean firing;
+ protected volatile AtomicBoolean firing;
- protected volatile boolean halt;
-
private ProcessInstanceManager processInstanceManager;
private WorkItemManager workItemManager;
@@ -280,6 +278,7 @@
this );
this.entryPoint = EntryPoint.DEFAULT;
+ this.firing = new AtomicBoolean( false );
initPartitionManagers();
initTransient();
@@ -512,7 +511,7 @@
}
public void halt() {
- this.halt = true;
+ this.agenda.halt();
}
// /**
@@ -560,99 +559,62 @@
// the firing for any other assertObject(..) that get
// nested inside, avoiding concurrent-modification
// exceptions, depending on code paths of the actions.
- this.halt = false;
-
if ( isSequential() ) {
for ( Iterator it = this.liaPropagations.iterator(); it.hasNext(); ) {
((LIANodePropagation) it.next()).doPropagation( this );
}
}
+ // do we need to call this in advance?
executeQueuedActions();
- boolean noneFired = true;
-
- if ( !this.firing ) {
- try {
- this.firing = true;
-
- while ( continueFiring( fireLimit ) && this.agenda.fireNextItem( agendaFilter ) ) {
- fireLimit = updateFireLimit( fireLimit );
- noneFired = false;
- executeQueuedActions();
- }
- } finally {
- this.firing = false;
- // @todo (mproctor) disabling Otherwise management for now, not
- // happy with the current implementation
- // if ( noneFired ) {
- // doOtherwise( agendaFilter,
- // fireLimit );
- // }
-
+ try {
+ if( this.firing.compareAndSet( false, true ) ) {
+ this.agenda.fireAllRules( agendaFilter, fireLimit );
}
+ } finally {
+ this.firing.set( false );
}
}
- private final boolean continueFiring(final int fireLimit) {
- return (!halt) && (fireLimit != 0);
+ /**
+ * Keeps firing activations until a halt is called. If in a given moment, there is
+ * no activation to fire, it will wait for an activation to be added to an active
+ * agenda group or rule flow group.
+ *
+ * @throws IllegalStateException if this method is called when running in sequential mode
+ */
+ public synchronized void fireUntilHalt() {
+ fireUntilHalt( null );
}
-
- private final int updateFireLimit(final int fireLimit) {
- return fireLimit > 0 ? fireLimit - 1 : fireLimit;
- }
-
+
/**
- * This does the "otherwise" phase of processing. If no items are fired,
- * then it will assert a temporary "Otherwise" fact and allow any rules to
- * fire to handle "otherwise" cases.
+ * Keeps firing activations until a halt is called. If in a given moment, there is
+ * no activation to fire, it will wait for an activation to be added to an active
+ * agenda group or rule flow group.
+ *
+ * @param agendaFilter filters the activations that may fire
+ *
+ * @throws IllegalStateException if this method is called when running in sequential mode
*/
- private void doOtherwise(final AgendaFilter agendaFilter,
- int fireLimit) {
- final FactHandle handle = this.insert( new Otherwise() );
- if ( !this.actionQueue.isEmpty() ) {
- executeQueuedActions();
+ public synchronized void fireUntilHalt(final AgendaFilter agendaFilter) {
+ if( isSequential() ) {
+ throw new IllegalStateException("fireUntilHalt() can not be called in sequential mode.");
}
-
- while ( continueFiring( fireLimit ) && this.agenda.fireNextItem( agendaFilter ) ) {
- fireLimit = updateFireLimit( fireLimit );
+
+ executeQueuedActions();
+ try {
+ if( this.firing.compareAndSet( false, true ) ) {
+ this.agenda.fireUntilHalt( agendaFilter );
+ }
+ } finally {
+ this.firing.set( false );
}
-
- this.retract( handle );
}
- //
- // MN: The following is the traditional fireAllRules (without otherwise).
- // Purely kept here as this implementation of otherwise is still
- // experimental.
- //
- // public synchronized void fireAllRules(final AgendaFilter agendaFilter)
- // throws FactException {
- // // If we're already firing a rule, then it'll pick up
- // // the firing for any other assertObject(..) that get
- // // nested inside, avoiding concurrent-modification
- // // exceptions, depending on code paths of the actions.
- //
- // if ( !this.factQueue.isEmpty() ) {
- // propagateQueuedActions();
- // }
- //
- // if ( !this.firing ) {
- // try {
- // this.firing = true;
- //
- // while ( this.agenda.fireNextItem( agendaFilter ) ) {
- // ;
- // }
- // } finally {
- // this.firing = false;
- // }
- // }
- // }
-
/**
* Returns the fact Object for the given <code>FactHandle</code>. It
- * actually attemps to return the value from the handle, before retrieving
+ * actually attempts to return the value from the handle, before retrieving
* it from objects map.
*
* @see WorkingMemory
@@ -718,18 +680,10 @@
public abstract QueryResults getQueryResults(String query);
- public AgendaGroup getFocus() {
- return this.agenda.getFocus();
- }
-
public void setFocus(final String focus) {
this.agenda.setFocus( focus );
}
- public void setFocus(final AgendaGroup focus) {
- this.agenda.setFocus( focus );
- }
-
public TruthMaintenanceSystem getTruthMaintenanceSystem() {
return this.tms;
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/DefaultAgenda.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -27,6 +27,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.drools.WorkingMemory;
import org.drools.base.DefaultKnowledgeHelper;
@@ -103,6 +104,8 @@
private ConsequenceExceptionHandler consequenceExceptionHandler;
+ protected volatile AtomicBoolean halt = new AtomicBoolean(false);
+
// ------------------------------------------------------------
// Constructors
// ------------------------------------------------------------
@@ -268,7 +271,6 @@
Scheduler.getInstance().scheduleAgendaItem( item,
this );
-
if ( this.scheduledActivations == null ) {
this.scheduledActivations = new org.drools.util.LinkedList();
}
@@ -378,6 +380,11 @@
rfg.addActivation( activation );
}
+
+ // making sure we re-evaluate agenda in case we are waiting for activations
+ synchronized( this.halt ) {
+ this.halt.notifyAll();
+ }
}
@@ -456,14 +463,14 @@
*/
public AgendaGroup getNextFocus() {
InternalAgendaGroup agendaGroup = null;
- // Iterate untill we find a populate AgendaModule or we reach the MAIN,
+ // Iterate until we find a populate AgendaModule or we reach the MAIN,
// default, AgendaGroup
while ( true ) {
agendaGroup = (InternalAgendaGroup) this.focusStack.getLast();
final boolean empty = agendaGroup.isEmpty();
- // No populated queus found so pop the focusStack and repeat
+ // No populated queues found so pop the focusStack and repeat
if ( empty && (this.focusStack.size() > 1) ) {
agendaGroup.setActive( false );
this.focusStack.removeLast();
@@ -645,18 +652,18 @@
public org.drools.util.LinkedList getScheduledActivationsLinkedList() {
return this.scheduledActivations;
}
-
+
public void clear() {
// reset focus stack
this.focusStack.clear();
this.focusStack.add( this.main );
-
+
// reset scheduled activations
if ( this.scheduledActivations != null && !this.scheduledActivations.isEmpty() ) {
for ( ScheduledAgendaItem item = (ScheduledAgendaItem) this.scheduledActivations.removeFirst(); item != null; item = (ScheduledAgendaItem) this.scheduledActivations.removeFirst() ) {
Scheduler.getInstance().removeAgendaItem( item );
}
- }
+ }
//reset all agenda groups
for ( InternalAgendaGroup group : this.agendaGroups.values() ) {
@@ -672,7 +679,7 @@
for ( ActivationGroup group : this.activationGroups.values() ) {
group.clear();
}
- }
+ }
/** (non-Javadoc)
* @see org.drools.common.AgendaI#clearAgenda()
@@ -934,10 +941,10 @@
*/
public boolean isRuleActiveInRuleFlowGroup(String ruleflowGroupName,
String ruleName) {
-
+
RuleFlowGroup systemRuleFlowGroup = this.getRuleFlowGroup( ruleflowGroupName );
- for ( Iterator< RuleFlowGroupNode > activations = systemRuleFlowGroup.iterator(); activations.hasNext(); ) {
+ for ( Iterator<RuleFlowGroupNode> activations = systemRuleFlowGroup.iterator(); activations.hasNext(); ) {
Activation activation = activations.next().getActivation();
if ( ruleName.equals( activation.getRule().getName() ) ) {
return true;
@@ -947,7 +954,7 @@
}
public void addRuleFlowGroupListener(String ruleFlowGroup,
- RuleFlowGroupListener listener ) {
+ RuleFlowGroupListener listener) {
InternalRuleFlowGroup rfg = (InternalRuleFlowGroup) this.getRuleFlowGroup( ruleFlowGroup );
rfg.addRuleFlowGroupListener( listener );
}
@@ -962,4 +969,51 @@
return this.getFocus().getName();
}
+ public void fireUntilHalt() {
+ fireUntilHalt( null );
+ }
+
+ public void fireUntilHalt( final AgendaFilter agendaFilter ) {
+ this.halt.set( false );
+ while ( continueFiring( -1 ) ) {
+ boolean fired = fireNextItem( agendaFilter );
+ if( !fired ) {
+ try {
+ synchronized( this.halt ) {
+ this.halt.wait();
+ }
+ } catch ( InterruptedException e ) {
+ this.halt.set( true );
+ }
+ } else {
+ this.workingMemory.executeQueuedActions();
+ }
+ }
+ }
+
+ public void fireAllRules(AgendaFilter agendaFilter,
+ int fireLimit) {
+ this.halt.set( false );
+ while ( continueFiring( fireLimit ) && fireNextItem( agendaFilter ) ) {
+ fireLimit = updateFireLimit( fireLimit );
+ this.workingMemory.executeQueuedActions();
+ }
+ }
+
+ private final boolean continueFiring(final int fireLimit) {
+ return (!halt.get()) && (fireLimit != 0);
+ }
+
+ private final int updateFireLimit(final int fireLimit) {
+ return fireLimit > 0 ? fireLimit - 1 : fireLimit;
+ }
+
+ public void halt() {
+ this.halt.set( true );
+ synchronized( this.halt ) {
+ this.halt.notifyAll();
+ }
+ }
+
+
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/InternalAgenda.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -72,4 +72,42 @@
public void removeRuleFlowGroupListener(String ruleFlowGroup,
RuleFlowGroupListener listener);
+ public void clear();
+
+ public void setWorkingMemory(final InternalWorkingMemory workingMemory);
+
+ /**
+ * Fires all activations currently in agenda that match the given agendaFilter
+ * until the fireLimit is reached or no more activations exist.
+ *
+ * @param agendaFilter the filter on which activations may fire.
+ * @param fireLimit the maximum number of activations that may fire. If -1, then it will
+ * fire until no more activations exist.
+ */
+ public void fireAllRules(AgendaFilter agendaFilter,
+ int fireLimit);
+
+ /**
+ * Stop agenda from firing any other rule. It will finish the current rule
+ * execution though.
+ */
+ public void halt();
+
+ /**
+ * Keeps firing activations until a halt is called. If in a given moment, there is
+ * no activation to fire, it will wait for an activation to be added to an active
+ * agenda group or rule flow group.
+ */
+ public void fireUntilHalt();
+
+ /**
+ * Keeps firing activations until a halt is called. If in a given moment, there is
+ * no activation to fire, it will wait for an activation to be added to an active
+ * agenda group or rule flow group.
+ *
+ * @param agendaFilter filters the activations that may fire
+ */
+ public void fireUntilHalt(AgendaFilter agendaFilter);
+
+
}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/common/Scheduler.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -78,7 +78,7 @@
* @param workingMemory
* The working memory session.
*/
- void scheduleAgendaItem(final ScheduledAgendaItem item, DefaultAgenda agenda) {
+ void scheduleAgendaItem(final ScheduledAgendaItem item, InternalAgenda agenda) {
DuractionJob job = new DuractionJob();
DuractionJobContext ctx = new DuractionJobContext( item, agenda );
TimerTrigger trigger = new TimerTrigger( item.getRule().getDuration().getDuration( item.getTuple() ), 0);
@@ -94,7 +94,7 @@
public static class DuractionJob implements Job {
public void execute(JobContext ctx) {
- DefaultAgenda agenda = ( DefaultAgenda ) ((DuractionJobContext)ctx).getAgenda();
+ InternalAgenda agenda = ( InternalAgenda ) ((DuractionJobContext)ctx).getAgenda();
ScheduledAgendaItem item = ((DuractionJobContext)ctx).getScheduledAgendaItem();
agenda.fireActivation( item );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooStatefulSession.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -8,7 +8,7 @@
import org.drools.FactHandle;
import org.drools.SessionConfiguration;
import org.drools.StatefulSession;
-import org.drools.common.DefaultAgenda;
+import org.drools.common.InternalAgenda;
import org.drools.common.InternalRuleBase;
import org.drools.concurrent.AssertObject;
import org.drools.concurrent.AssertObjects;
@@ -58,7 +58,7 @@
final InitialFactHandle initialFactHandle,
final long propagationContext,
final SessionConfiguration config,
- final DefaultAgenda agenda) {
+ final InternalAgenda agenda) {
super( id,
ruleBase,
handleFactory,
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooWorkingMemory.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -28,6 +28,7 @@
import org.drools.base.DroolsQuery;
import org.drools.common.AbstractWorkingMemory;
import org.drools.common.DefaultAgenda;
+import org.drools.common.InternalAgenda;
import org.drools.common.InternalFactHandle;
import org.drools.common.InternalRuleBase;
import org.drools.common.InternalWorkingMemory;
@@ -86,7 +87,7 @@
final InitialFactHandle initialFactHandle,
final long propagationContext,
final SessionConfiguration config,
- final DefaultAgenda agenda) {
+ final InternalAgenda agenda) {
super( id,
ruleBase,
handleFactory,
Modified: labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-core/src/test/java/org/drools/reteoo/OtherwiseTest.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -24,7 +24,7 @@
* @author Michael Neale
*/
public class OtherwiseTest extends TestCase {
-
+
public void testOneRuleFiringNoOtherwise() throws Exception {
final RuleBase ruleBase = RuleBaseFactory.newRuleBase( RuleBase.RETEOO );
Modified: labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java
===================================================================
--- labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java 2008-09-08 11:35:53 UTC (rev 22514)
+++ labs/jbossrules/trunk/drools-server/src/test/java/org/drools/server/TeamAllocationTest.java 2008-09-08 14:20:54 UTC (rev 22515)
@@ -13,6 +13,9 @@
public void testBasics() throws Exception {
PackageBuilder pb = new PackageBuilder();
pb.addPackageFromDrl(new InputStreamReader(getClass().getResourceAsStream("/TeamAllocation.drl")));
+ if( pb.hasErrors() ) {
+ System.out.println(pb.getErrors().toString());
+ }
assertFalse(pb.hasErrors());
RuleBase rb = RuleBaseFactory.newRuleBase();
More information about the jboss-svn-commits
mailing list