[jboss-svn-commits] JBL Code SVN: r23597 - in labs/jbossrules/trunk: drools-compiler/src/main/java/org/drools/rule/builder and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Oct 27 12:23:10 EDT 2008
Author: tirelli
Date: 2008-10-27 12:23:10 -0400 (Mon, 27 Oct 2008)
New Revision: 23597
Added:
labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_CEP_SimpleLengthWindow.drl
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingLengthWindow.java
Modified:
labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/lang/descr/BehaviorDescr.java
labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/rule/builder/PatternBuilder.java
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/lang/RuleParserTest.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
Log:
Fixing sliding windows: time and length
Modified: labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/lang/descr/BehaviorDescr.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/lang/descr/BehaviorDescr.java 2008-10-27 14:53:42 UTC (rev 23596)
+++ labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/lang/descr/BehaviorDescr.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -27,8 +27,6 @@
*/
public class BehaviorDescr extends BaseDescr {
- private String type;
-
/**
* @param type
*/
@@ -41,21 +39,21 @@
*/
public BehaviorDescr(String type) {
super();
- this.type = type;
+ setText(type);
}
/**
* @return the type
*/
public String getType() {
- return type;
+ return getText();
}
/**
* @param type the type to set
*/
public void setType(String type) {
- this.type = type;
+ setText( type );
}
Modified: labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/rule/builder/PatternBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/rule/builder/PatternBuilder.java 2008-10-27 14:53:42 UTC (rev 23596)
+++ labs/jbossrules/trunk/drools-compiler/src/main/java/org/drools/rule/builder/PatternBuilder.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -68,6 +68,7 @@
import org.drools.rule.ReturnValueConstraint;
import org.drools.rule.ReturnValueRestriction;
import org.drools.rule.RuleConditionElement;
+import org.drools.rule.SlidingLengthWindow;
import org.drools.rule.SlidingTimeWindow;
import org.drools.rule.VariableConstraint;
import org.drools.rule.VariableRestriction;
@@ -211,6 +212,10 @@
SlidingWindowDescr swd = (SlidingWindowDescr) behaviorDescr;
SlidingTimeWindow window = new SlidingTimeWindow( swd.getLength() );
pattern.addBehavior( window );
+ } else if( Behavior.BehaviorType.LENGTH_WINDOW.matches( behaviorDescr.getType() ) ) {
+ SlidingWindowDescr swd = (SlidingWindowDescr) behaviorDescr;
+ SlidingLengthWindow window = new SlidingLengthWindow( (int) swd.getLength() );
+ pattern.addBehavior( window );
}
}
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-10-27 14:53:42 UTC (rev 23596)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -511,6 +511,88 @@
}
+ public void testSimpleLengthWindow() throws Exception {
+ // read in the source
+ final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_CEP_SimpleLengthWindow.drl" ) );
+ final RuleBase ruleBase = loadRuleBase( reader );
+
+ SessionConfiguration conf = new SessionConfiguration();
+ conf.setClockType( ClockType.REALTIME_CLOCK );
+ StatefulSession wm = ruleBase.newStatefulSession( conf );
+
+ final List results = new ArrayList();
+
+ wm.setGlobal( "results",
+ results );
+
+ EventFactHandle handle1 = (EventFactHandle) wm.insert( new OrderEvent( "1",
+ "customer A",
+ 70 ) );
+
+ // wm = SerializationHelper.serializeObject(wm);
+ wm.fireAllRules();
+
+ assertEquals( 1,
+ results.size() );
+ assertEquals( 70,
+ ((Number) results.get( 0 )).intValue() );
+
+ // assert new data
+ EventFactHandle handle2 = (EventFactHandle) wm.insert( new OrderEvent( "2",
+ "customer A",
+ 60 ) );
+ wm.fireAllRules();
+
+ assertEquals( 2,
+ results.size() );
+ assertEquals( 65,
+ ((Number) results.get( 1 )).intValue() );
+
+ // assert new data
+ EventFactHandle handle3 = (EventFactHandle) wm.insert( new OrderEvent( "3",
+ "customer A",
+ 50 ) );
+ wm.fireAllRules();
+
+ assertEquals( 3,
+ results.size() );
+ assertEquals( 60,
+ ((Number) results.get( 2 )).intValue() );
+
+ // assert new data
+ EventFactHandle handle4 = (EventFactHandle) wm.insert( new OrderEvent( "4",
+ "customer A",
+ 25 ) );
+ wm.fireAllRules();
+
+ // first event should have expired, making average under the rule threshold, so no additional rule fire
+ assertEquals( 3,
+ results.size() );
+
+ // assert new data
+ EventFactHandle handle5 = (EventFactHandle) wm.insert( new OrderEvent( "5",
+ "customer A",
+ 70 ) );
+ // wm = SerializationHelper.serializeObject(wm);
+ wm.fireAllRules();
+
+ // still under the threshold, so no fire
+ assertEquals( 3,
+ results.size() );
+
+ // assert new data
+ EventFactHandle handle6 = (EventFactHandle) wm.insert( new OrderEvent( "6",
+ "customer A",
+ 115 ) );
+ wm.fireAllRules();
+
+ assertEquals( 4,
+ results.size() );
+ assertEquals( 70,
+ ((Number) results.get( 3 )).intValue() );
+
+ }
+
// public void FIXME_testTransactionCorrelation() throws Exception {
// // read in the source
// final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_TransactionCorrelation.drl" ) );
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/lang/RuleParserTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/lang/RuleParserTest.java 2008-10-27 14:53:42 UTC (rev 23596)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/lang/RuleParserTest.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -36,6 +36,7 @@
import org.drools.lang.descr.AccumulateDescr;
import org.drools.lang.descr.AndDescr;
import org.drools.lang.descr.AttributeDescr;
+import org.drools.lang.descr.BehaviorDescr;
import org.drools.lang.descr.CollectDescr;
import org.drools.lang.descr.EntryPointDescr;
import org.drools.lang.descr.EvalDescr;
@@ -62,9 +63,11 @@
import org.drools.lang.descr.RestrictionConnectiveDescr;
import org.drools.lang.descr.ReturnValueRestrictionDescr;
import org.drools.lang.descr.RuleDescr;
+import org.drools.lang.descr.SlidingWindowDescr;
import org.drools.lang.descr.TypeDeclarationDescr;
import org.drools.lang.descr.TypeFieldDescr;
import org.drools.lang.descr.VariableRestrictionDescr;
+import org.drools.rule.Behavior;
public class RuleParserTest extends TestCase {
@@ -3078,6 +3081,26 @@
assertEquals("StreamA", entry.getEntryId());
}
+ public void testSlidingWindow() throws Exception {
+ final String text = "StockTick( symbol==\"ACME\") over window:length(10)";
+
+ PatternDescr pattern = (PatternDescr) parse("pattern_source", "lhs",
+ text);
+
+ assertEquals(1, pattern.getDescrs().size());
+ FieldConstraintDescr fcd = (FieldConstraintDescr) pattern.getDescrs()
+ .get(0);
+ assertEquals("symbol", fcd.getFieldName());
+
+ List<BehaviorDescr> behaviors = pattern.getBehaviors();
+ assertNotNull( behaviors );
+ assertEquals( 1, behaviors.size() );
+ SlidingWindowDescr descr = (SlidingWindowDescr) behaviors.get( 0 );
+ assertEquals( "length", descr.getText() );
+ assertEquals( "length", descr.getType() );
+ assertEquals( "10", descr.getParameters() );
+ }
+
public void testNesting() throws Exception {
parseResource("compilation_unit", "compilation_unit",
"not_pluggable_operator.drl");
Added: labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_CEP_SimpleLengthWindow.drl
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_CEP_SimpleLengthWindow.drl (rev 0)
+++ labs/jbossrules/trunk/drools-compiler/src/test/resources/org/drools/integrationtests/test_CEP_SimpleLengthWindow.drl 2008-10-27 16:23:10 UTC (rev 23597)
@@ -0,0 +1,18 @@
+package org.drools;
+
+global java.util.List results;
+
+declare OrderEvent
+ @role( event )
+end
+
+rule "average over threshold"
+when
+ Number( $avg : intValue >= 50 ) from accumulate(
+ OrderEvent( $amt : total ) over window:length(3),
+ average( $amt ) )
+then
+ //System.out.println( "Over threshold: "+$avg );
+ results.add( $avg );
+end
+
Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingLengthWindow.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingLengthWindow.java (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingLengthWindow.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2008 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on Apr 26, 2008
+ */
+
+package org.drools.rule;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.PropagationContextImpl;
+import org.drools.reteoo.RightTuple;
+import org.drools.spi.PropagationContext;
+
+/**
+ * A length window behavior implementation
+ *
+ * @author etirelli
+ */
+public class SlidingLengthWindow
+ implements
+ Externalizable,
+ Behavior {
+
+ private int size;
+
+ public SlidingLengthWindow() {
+ this( 0 );
+ }
+
+ /**
+ * @param size
+ */
+ public SlidingLengthWindow(final int size) {
+ super();
+ this.size = size;
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
+ */
+ public void readExternal(final ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.size = in.readInt();
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
+ */
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeInt( this.size );
+
+ }
+
+ public BehaviorType getType() {
+ return BehaviorType.LENGTH_WINDOW;
+ }
+
+ /**
+ * @return the size
+ */
+ public long getSize() {
+ return size;
+ }
+
+ /**
+ * @param size the size to set
+ */
+ public void setSize(final int size) {
+ this.size = size;
+ }
+
+ public Object createContext() {
+ return new SlidingLengthWindowContext( this.size );
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * @see org.drools.rule.Behavior#assertRightTuple(java.lang.Object, org.drools.reteoo.RightTuple, org.drools.common.InternalWorkingMemory)
+ */
+ public void assertRightTuple(final Object context,
+ final RightTuple rightTuple,
+ final InternalWorkingMemory workingMemory) {
+ SlidingLengthWindowContext window = (SlidingLengthWindowContext) context;
+ window.pos = (window.pos + 1) % window.rightTuples.length;
+ if( window.rightTuples[window.pos] != null ) {
+ final RightTuple tuple = window.rightTuples[window.pos];
+ // retract previous
+ final PropagationContext propagationContext = new PropagationContextImpl( workingMemory.getNextPropagationIdCounter(),
+ PropagationContext.RETRACTION,
+ null,
+ null,
+ tuple.getFactHandle() );
+ tuple.getRightTupleSink().retractRightTuple( tuple,
+ propagationContext,
+ workingMemory );
+ tuple.unlinkFromRightParent();
+
+ }
+ window.rightTuples[window.pos] = rightTuple;
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * @see org.drools.rule.Behavior#retractRightTuple(java.lang.Object, org.drools.reteoo.RightTuple, org.drools.common.InternalWorkingMemory)
+ */
+ public void retractRightTuple(final Object context,
+ final RightTuple rightTuple,
+ final InternalWorkingMemory workingMemory) {
+ SlidingLengthWindowContext window = (SlidingLengthWindowContext) context;
+ final int last = ( window.pos == 0 ) ? window.rightTuples.length-1 : window.pos-1;
+ // we start the loop on current pos because the most common scenario is to retract the
+ // right tuple referenced by the current "pos" position, causing this loop to only execute
+ // the first iteration
+ for( int i = window.pos; i != last; i = (i+1)%window.rightTuples.length ) {
+ if( window.rightTuples[i] == rightTuple ) {
+ window.rightTuples[i] = null;
+ break;
+ }
+ }
+ }
+
+ public void expireTuples(Object context,
+ InternalWorkingMemory workingMemory) {
+ // do nothing
+ }
+
+
+ public String toString() {
+ return "SlidingLengthWindow( size="+size+" )";
+ }
+
+ /**
+ * A Context object for length windows
+ *
+ * @author etirelli
+ */
+ private static class SlidingLengthWindowContext
+ implements
+ Externalizable {
+
+ public RightTuple[] rightTuples;
+ public int pos = 0;
+
+ public SlidingLengthWindowContext( final int size ) {
+ this.rightTuples = new RightTuple[size];
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.pos = in.readInt();
+ this.rightTuples = (RightTuple[]) in.readObject();
+
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt( this.pos );
+ out.writeObject( this.rightTuples );
+ }
+ }
+
+}
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java 2008-10-27 14:53:42 UTC (rev 23596)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java 2008-10-27 16:23:10 UTC (rev 23597)
@@ -49,9 +49,6 @@
Behavior {
private long size;
-
- // FIXME: THIS IS SO WRONG!!! HOW DID I MADE THAT????
- private volatile transient RightTuple expiringTuple;
public SlidingTimeWindow() {
this( 0 );
@@ -103,8 +100,7 @@
}
public Object createContext() {
- return new PriorityQueue<RightTuple>( 16, // arbitrary size... can we improve it?
- new SlidingTimeWindowComparator() );
+ return new SlidingTimeWindowContext();
}
/**
@@ -115,9 +111,9 @@
public void assertRightTuple(final Object context,
final RightTuple rightTuple,
final InternalWorkingMemory workingMemory) {
- PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
- queue.add( rightTuple );
- if ( queue.peek() == rightTuple ) {
+ SlidingTimeWindowContext queue = (SlidingTimeWindowContext) context;
+ queue.queue.add( rightTuple );
+ if ( queue.queue.peek() == rightTuple ) {
// update next expiration time
updateNextExpiration( rightTuple,
workingMemory,
@@ -133,18 +129,18 @@
public void retractRightTuple(final Object context,
final RightTuple rightTuple,
final InternalWorkingMemory workingMemory) {
+ SlidingTimeWindowContext queue = (SlidingTimeWindowContext) context;
// it may be a call back to expire the tuple that is already being expired
- if( this.expiringTuple != rightTuple ) {
- PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
- if ( queue.peek() == rightTuple ) {
+ if ( queue.expiringTuple != rightTuple ) {
+ if ( queue.queue.peek() == rightTuple ) {
// it was the head of the queue
- queue.poll();
+ queue.queue.poll();
// update next expiration time
- updateNextExpiration( queue.peek(),
+ updateNextExpiration( queue.queue.peek(),
workingMemory,
queue );
} else {
- queue.remove( rightTuple );
+ queue.queue.remove( rightTuple );
}
}
}
@@ -153,12 +149,12 @@
final InternalWorkingMemory workingMemory) {
TimerService clock = workingMemory.getTimerService();
long currentTime = clock.getCurrentTime();
- PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
- RightTuple tuple = queue.peek();
+ SlidingTimeWindowContext queue = (SlidingTimeWindowContext) context;
+ RightTuple tuple = queue.queue.peek();
while ( tuple != null && isExpired( currentTime,
tuple ) ) {
- this.expiringTuple = tuple;
- queue.remove();
+ queue.expiringTuple = tuple;
+ queue.queue.remove();
final PropagationContext propagationContext = new PropagationContextImpl( workingMemory.getNextPropagationIdCounter(),
PropagationContext.RETRACTION,
null,
@@ -168,10 +164,10 @@
propagationContext,
workingMemory );
tuple.unlinkFromRightParent();
- this.expiringTuple = null;
- tuple = queue.peek();
+ queue.expiringTuple = null;
+ tuple = queue.queue.peek();
}
-
+
// update next expiration time
updateNextExpiration( tuple,
workingMemory,
@@ -193,15 +189,19 @@
TimerService clock = workingMemory.getTimerService();
if ( rightTuple != null ) {
long nextTimestamp = ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size;
- JobContext jobctx = new BehaviorJobContext( workingMemory, this, context );
+ JobContext jobctx = new BehaviorJobContext( workingMemory,
+ this,
+ context );
BehaviorJob job = new BehaviorJob();
- JobHandle handle = clock.scheduleJob( job, jobctx, new PointInTimeTrigger( nextTimestamp ));
+ JobHandle handle = clock.scheduleJob( job,
+ jobctx,
+ new PointInTimeTrigger( nextTimestamp ) );
jobctx.setJobHandle( handle );
}
}
-
+
public String toString() {
- return "SlidingTimeWindow( size="+size+" )";
+ return "SlidingTimeWindow( size=" + size + " )";
}
/**
@@ -219,13 +219,41 @@
return (e1.getStartTimestamp() < e2.getStartTimestamp()) ? -1 : (e1.getStartTimestamp() == e2.getStartTimestamp() ? 0 : 1);
}
}
-
- private static class PointInTimeTrigger implements Trigger {
+
+ private static class SlidingTimeWindowContext
+ implements
+ Externalizable {
+
+ public PriorityQueue<RightTuple> queue;
+ public RightTuple expiringTuple;
+
+ public SlidingTimeWindowContext() {
+ this.queue = new PriorityQueue<RightTuple>( 16, // arbitrary size... can we improve it?
+ new SlidingTimeWindowComparator() );
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.queue = (PriorityQueue<RightTuple>) in.readObject();
+ this.expiringTuple = (RightTuple) in.readObject();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject( this.queue );
+ out.writeObject( this.expiringTuple );
+ }
+
+ }
+
+ private static class PointInTimeTrigger
+ implements
+ Trigger {
private Date timestamp;
-
- public PointInTimeTrigger() {}
-
- public PointInTimeTrigger( long timestamp ) {
+
+ public PointInTimeTrigger() {
+ }
+
+ public PointInTimeTrigger(long timestamp) {
this.timestamp = new Date( timestamp );
}
@@ -242,13 +270,15 @@
out.writeObject( this.timestamp );
}
}
-
- private static class BehaviorJobContext implements JobContext {
+
+ private static class BehaviorJobContext
+ implements
+ JobContext {
public InternalWorkingMemory workingMemory;
- public Behavior behavior;
- public Object behaviorContext;
- public JobHandle handle;
-
+ public Behavior behavior;
+ public Object behaviorContext;
+ public JobHandle handle;
+
/**
* @param workingMemory
* @param behavior
@@ -270,21 +300,26 @@
public void setJobHandle(JobHandle jobHandle) {
this.handle = jobHandle;
}
-
+
}
-
- private static class BehaviorJob implements Job {
+ private static class BehaviorJob
+ implements
+ Job {
+
public void execute(JobContext ctx) {
BehaviorJobContext context = (BehaviorJobContext) ctx;
- context.workingMemory.queueWorkingMemoryAction( new BehaviorExpireWMAction( context.behavior, context.behaviorContext ) );
+ context.workingMemory.queueWorkingMemoryAction( new BehaviorExpireWMAction( context.behavior,
+ context.behaviorContext ) );
}
}
-
- private static class BehaviorExpireWMAction implements WorkingMemoryAction {
+
+ private static class BehaviorExpireWMAction
+ implements
+ WorkingMemoryAction {
private final Behavior behavior;
- private final Object context;
+ private final Object context;
/**
* @param behavior
@@ -298,25 +333,26 @@
}
public void execute(InternalWorkingMemory workingMemory) {
- this.behavior.expireTuples( context, workingMemory );
+ this.behavior.expireTuples( context,
+ workingMemory );
}
public void write(MarshallerWriteContext context) throws IOException {
// TODO Auto-generated method stub
-
+
}
public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
// TODO Auto-generated method stub
-
+
}
public void writeExternal(ObjectOutput out) throws IOException {
// TODO Auto-generated method stub
-
+
}
-
+
}
}
More information about the jboss-svn-commits
mailing list