[jboss-svn-commits] JBL Code SVN: r20378 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools/rule and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Jun 9 11:16:36 EDT 2008
Author: tirelli
Date: 2008-06-09 11:16:36 -0400 (Mon, 09 Jun 2008)
New Revision: 20378
Modified:
labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
Log:
Fixing job scheduling for sliding windows
Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-06-09 14:59:28 UTC (rev 20377)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java 2008-06-09 15:16:36 UTC (rev 20378)
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
+import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -25,6 +26,7 @@
import org.drools.compiler.PackageBuilder;
import org.drools.lang.descr.PackageDescr;
import org.drools.rule.Package;
+import org.drools.time.SessionPseudoClock;
import org.drools.time.impl.PseudoClockScheduler;
public class CepEspTest extends TestCase {
@@ -404,7 +406,7 @@
// how to initialize the clock?
// how to configure the clock?
- PseudoClockScheduler clock = (PseudoClockScheduler) wm.getSessionClock();
+ SessionPseudoClock clock = (SessionPseudoClock) wm.getSessionClock();
clock.advanceTime( 5, TimeUnit.SECONDS ); // 5 seconds
EventFactHandle handle1 = (EventFactHandle) wm.insert( new OrderEvent( "1",
@@ -508,7 +510,7 @@
((Number) results.get( 3 )).intValue() );
}
-
+
// public void FIXME_testTransactionCorrelation() throws Exception {
// // read in the source
// final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_TransactionCorrelation.drl" ) );
Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java 2008-06-09 14:59:28 UTC (rev 20377)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java 2008-06-09 15:16:36 UTC (rev 20378)
@@ -22,14 +22,21 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Comparator;
+import java.util.Date;
import java.util.PriorityQueue;
import org.drools.common.EventFactHandle;
import org.drools.common.InternalWorkingMemory;
import org.drools.common.PropagationContextImpl;
+import org.drools.common.WorkingMemoryAction;
+import org.drools.marshalling.MarshallerWriteContext;
import org.drools.reteoo.RightTuple;
import org.drools.spi.PropagationContext;
+import org.drools.time.Job;
+import org.drools.time.JobContext;
+import org.drools.time.JobHandle;
import org.drools.time.TimerService;
+import org.drools.time.Trigger;
/**
* @author etirelli
@@ -183,12 +190,13 @@
final Object context) {
TimerService clock = workingMemory.getTimerService();
if ( rightTuple != null ) {
- // FIXME
-// clock.schedule( this,
-// context,
-// ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size );
- } else {
-// clock.unschedule( this );
+ long nextTimestamp = ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size;
+ JobContext jobctx = new BehaviorJobContext( workingMemory, this, context );
+ BehaviorJob job = new BehaviorJob();
+ JobHandle handle = clock.scheduleJob( job, jobctx, new PointInTimeTrigger( nextTimestamp ));
+ jobctx.setJobHandle( handle );
+// } else {
+// clock.removeJob( jobHandle );
}
}
@@ -211,5 +219,104 @@
return (e1.getStartTimestamp() < e2.getStartTimestamp()) ? -1 : (e1.getStartTimestamp() == e2.getStartTimestamp() ? 0 : 1);
}
}
+
+ private static class PointInTimeTrigger implements Trigger {
+ private Date timestamp;
+
+ public PointInTimeTrigger() {}
+
+ public PointInTimeTrigger( long timestamp ) {
+ this.timestamp = new Date( timestamp );
+ }
+ public Date getNextFireTime() {
+ return this.timestamp;
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ this.timestamp = (Date) in.readObject();
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject( this.timestamp );
+ }
+ }
+
+ private static class BehaviorJobContext implements JobContext {
+ public InternalWorkingMemory workingMemory;
+ public Behavior behavior;
+ public Object behaviorContext;
+ public JobHandle handle;
+
+ /**
+ * @param workingMemory
+ * @param behavior
+ * @param behaviorContext
+ */
+ public BehaviorJobContext(InternalWorkingMemory workingMemory,
+ Behavior behavior,
+ Object behaviorContext) {
+ super();
+ this.workingMemory = workingMemory;
+ this.behavior = behavior;
+ this.behaviorContext = behaviorContext;
+ }
+
+ public JobHandle getJobHandle() {
+ return this.handle;
+ }
+
+ public void setJobHandle(JobHandle jobHandle) {
+ this.handle = jobHandle;
+ }
+
+ }
+
+ private static class BehaviorJob implements Job {
+
+ public void execute(JobContext ctx) {
+ BehaviorJobContext context = (BehaviorJobContext) ctx;
+ context.workingMemory.queueWorkingMemoryAction( new BehaviorExpireWMAction( context.behavior, context.behaviorContext ) );
+ }
+
+ }
+
+ private static class BehaviorExpireWMAction implements WorkingMemoryAction {
+ private final Behavior behavior;
+ private final Object context;
+
+ /**
+ * @param behavior
+ * @param context
+ */
+ public BehaviorExpireWMAction(Behavior behavior,
+ Object context) {
+ super();
+ this.behavior = behavior;
+ this.context = context;
+ }
+
+ public void execute(InternalWorkingMemory workingMemory) {
+ this.behavior.expireTuples( context, workingMemory );
+ }
+
+ public void write(MarshallerWriteContext context) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void readExternal(ObjectInput in) throws IOException,
+ ClassNotFoundException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void writeExternal(ObjectOutput out) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
}
More information about the jboss-svn-commits
mailing list