[jboss-svn-commits] JBL Code SVN: r20378 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools/rule and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Jun 9 11:16:36 EDT 2008


Author: tirelli
Date: 2008-06-09 11:16:36 -0400 (Mon, 09 Jun 2008)
New Revision: 20378

Modified:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
Log:
Fixing job scheduling for sliding windows

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-06-09 14:59:28 UTC (rev 20377)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-06-09 15:16:36 UTC (rev 20378)
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -25,6 +26,7 @@
 import org.drools.compiler.PackageBuilder;
 import org.drools.lang.descr.PackageDescr;
 import org.drools.rule.Package;
+import org.drools.time.SessionPseudoClock;
 import org.drools.time.impl.PseudoClockScheduler;
 
 public class CepEspTest extends TestCase {
@@ -404,7 +406,7 @@
 
         // how to initialize the clock?
         // how to configure the clock?
-        PseudoClockScheduler clock = (PseudoClockScheduler) wm.getSessionClock();
+        SessionPseudoClock clock = (SessionPseudoClock) wm.getSessionClock();
 
         clock.advanceTime( 5, TimeUnit.SECONDS ); // 5 seconds
         EventFactHandle handle1 = (EventFactHandle) wm.insert( new OrderEvent( "1",
@@ -508,7 +510,7 @@
                       ((Number) results.get( 3 )).intValue() );
 
     }
-
+    
     //    public void FIXME_testTransactionCorrelation() throws Exception {
     //        // read in the source
     //        final Reader reader = new InputStreamReader( getClass().getResourceAsStream( "test_TransactionCorrelation.drl" ) );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java	2008-06-09 14:59:28 UTC (rev 20377)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java	2008-06-09 15:16:36 UTC (rev 20378)
@@ -22,14 +22,21 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.PriorityQueue;
 
 import org.drools.common.EventFactHandle;
 import org.drools.common.InternalWorkingMemory;
 import org.drools.common.PropagationContextImpl;
+import org.drools.common.WorkingMemoryAction;
+import org.drools.marshalling.MarshallerWriteContext;
 import org.drools.reteoo.RightTuple;
 import org.drools.spi.PropagationContext;
+import org.drools.time.Job;
+import org.drools.time.JobContext;
+import org.drools.time.JobHandle;
 import org.drools.time.TimerService;
+import org.drools.time.Trigger;
 
 /**
  * @author etirelli
@@ -183,12 +190,13 @@
                                       final Object context) {
         TimerService clock = workingMemory.getTimerService();
         if ( rightTuple != null ) {
-            // FIXME
-//            clock.schedule( this,
-//                            context,
-//                            ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size );
-        } else {
-//            clock.unschedule( this );
+            long nextTimestamp = ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size;
+            JobContext jobctx = new BehaviorJobContext( workingMemory, this, context );
+            BehaviorJob job = new BehaviorJob();
+            JobHandle handle = clock.scheduleJob( job, jobctx, new PointInTimeTrigger( nextTimestamp ));
+            jobctx.setJobHandle( handle );
+//        } else { 
+//            clock.removeJob( jobHandle );
         }
     }
     
@@ -211,5 +219,104 @@
             return (e1.getStartTimestamp() < e2.getStartTimestamp()) ? -1 : (e1.getStartTimestamp() == e2.getStartTimestamp() ? 0 : 1);
         }
     }
+    
+    private static class PointInTimeTrigger implements Trigger {
+        private Date timestamp;
+        
+        public PointInTimeTrigger() {}
+        
+        public PointInTimeTrigger( long timestamp ) {
+            this.timestamp = new Date( timestamp );
+        }
 
+        public Date getNextFireTime() {
+            return this.timestamp;
+        }
+
+        public void readExternal(ObjectInput in) throws IOException,
+                                                ClassNotFoundException {
+            this.timestamp = (Date) in.readObject();
+        }
+
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject( this.timestamp );
+        }
+    }
+    
+    private static class BehaviorJobContext implements JobContext {
+        public InternalWorkingMemory workingMemory;
+        public Behavior behavior;
+        public Object behaviorContext;
+        public JobHandle handle;
+        
+        /**
+         * @param workingMemory
+         * @param behavior
+         * @param behaviorContext
+         */
+        public BehaviorJobContext(InternalWorkingMemory workingMemory,
+                                  Behavior behavior,
+                                  Object behaviorContext) {
+            super();
+            this.workingMemory = workingMemory;
+            this.behavior = behavior;
+            this.behaviorContext = behaviorContext;
+        }
+
+        public JobHandle getJobHandle() {
+            return this.handle;
+        }
+
+        public void setJobHandle(JobHandle jobHandle) {
+            this.handle = jobHandle;
+        }
+        
+    }
+    
+    private static class BehaviorJob implements Job {
+
+        public void execute(JobContext ctx) {
+            BehaviorJobContext context = (BehaviorJobContext) ctx;
+            context.workingMemory.queueWorkingMemoryAction( new BehaviorExpireWMAction( context.behavior, context.behaviorContext ) );
+        }
+
+    }
+    
+    private static class BehaviorExpireWMAction implements WorkingMemoryAction {
+        private final Behavior behavior;
+        private final Object context;
+
+        /**
+         * @param behavior
+         * @param context
+         */
+        public BehaviorExpireWMAction(Behavior behavior,
+                                      Object context) {
+            super();
+            this.behavior = behavior;
+            this.context = context;
+        }
+
+        public void execute(InternalWorkingMemory workingMemory) {
+            this.behavior.expireTuples( context, workingMemory );
+        }
+
+        public void write(MarshallerWriteContext context) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        public void readExternal(ObjectInput in) throws IOException,
+                                                ClassNotFoundException {
+            // TODO Auto-generated method stub
+            
+        }
+
+        public void writeExternal(ObjectOutput out) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+
 }




More information about the jboss-svn-commits mailing list