[jboss-svn-commits] JBL Code SVN: r19942 - in labs/jbossrules/trunk: drools-core/src/main/java/org/drools and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue May 13 15:55:56 EDT 2008


Author: tirelli
Date: 2008-05-13 15:55:55 -0400 (Tue, 13 May 2008)
New Revision: 19942

Added:
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/BehaviorManager.java
Modified:
   labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/ClockType.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightTuple.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/AccumulateBuilder.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/PatternBuilder.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/Behavior.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionClock.java
   labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionPseudoClock.java
Log:
JBRULES-1599: adding support to absolute time windows

Modified: labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java
===================================================================
--- labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-compiler/src/test/java/org/drools/integrationtests/CepEspTest.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -57,7 +57,7 @@
         RuleBase ruleBase = getRuleBase();
         ruleBase.addPackage( pkg );
         // load up the rulebase
-        ruleBase    = SerializationHelper.serializeObject(ruleBase);
+        ruleBase = SerializationHelper.serializeObject( ruleBase );
         return ruleBase;
     }
 
@@ -104,11 +104,11 @@
         assertTrue( handle3.isEvent() );
         assertTrue( handle4.isEvent() );
 
-        wm  = SerializationHelper.serializeObject(wm);
+        wm = SerializationHelper.serializeObject( wm );
         wm.fireAllRules();
 
         assertEquals( 2,
-                      ((List)wm.getGlobal("results")).size() );
+                      ((List) wm.getGlobal( "results" )).size() );
 
     }
 
@@ -136,13 +136,13 @@
         StockTick tick3 = new StockTick( 3,
                                          "ACME",
                                          10,
-                                         System.currentTimeMillis(), 
+                                         System.currentTimeMillis(),
                                          8 );
         StockTick tick4 = new StockTick( 4,
                                          "DROO",
                                          50,
                                          System.currentTimeMillis(),
-                                         7);
+                                         7 );
 
         InternalFactHandle handle1 = (InternalFactHandle) wm.insert( tick1 );
         InternalFactHandle handle2 = (InternalFactHandle) wm.insert( tick2 );
@@ -158,17 +158,21 @@
         assertTrue( handle2.isEvent() );
         assertTrue( handle3.isEvent() );
         assertTrue( handle4.isEvent() );
-        
+
         EventFactHandle eh1 = (EventFactHandle) handle1;
         EventFactHandle eh2 = (EventFactHandle) handle2;
         EventFactHandle eh3 = (EventFactHandle) handle3;
         EventFactHandle eh4 = (EventFactHandle) handle4;
-        
-        assertEquals( tick1.getDuration(), eh1.getDuration() );
-        assertEquals( tick2.getDuration(), eh2.getDuration() );
-        assertEquals( tick3.getDuration(), eh3.getDuration() );
-        assertEquals( tick4.getDuration(), eh4.getDuration() );
 
+        assertEquals( tick1.getDuration(),
+                      eh1.getDuration() );
+        assertEquals( tick2.getDuration(),
+                      eh2.getDuration() );
+        assertEquals( tick3.getDuration(),
+                      eh3.getDuration() );
+        assertEquals( tick4.getDuration(),
+                      eh4.getDuration() );
+
         wm.fireAllRules();
 
         assertEquals( 2,
@@ -200,31 +204,31 @@
         final List results_finished_by = new ArrayList();
 
         wm.setGlobal( "results_coincides",
-                results_coincides );
+                      results_coincides );
         wm.setGlobal( "results_before",
-                results_before );
+                      results_before );
         wm.setGlobal( "results_after",
                       results_after );
         wm.setGlobal( "results_meets",
-                results_meets );
+                      results_meets );
         wm.setGlobal( "results_met_by",
                       results_met_by );
         wm.setGlobal( "results_overlaps",
-                	  results_overlaps );
+                      results_overlaps );
         wm.setGlobal( "results_overlapped_by",
                       results_overlapped_by );
         wm.setGlobal( "results_during",
-          	  results_during );
+                      results_during );
         wm.setGlobal( "results_includes",
-                results_includes );
+                      results_includes );
         wm.setGlobal( "results_starts",
-          	  results_starts );
+                      results_starts );
         wm.setGlobal( "results_started_by",
-                results_started_by );
+                      results_started_by );
         wm.setGlobal( "results_finishes",
-            	  results_finishes );
+                      results_finishes );
         wm.setGlobal( "results_finished_by",
-                  results_finished_by );
+                      results_finished_by );
 
         StockTick tick1 = new StockTick( 1,
                                          "DROO",
@@ -247,23 +251,23 @@
                                          System.currentTimeMillis(),
                                          5 );
         StockTick tick5 = new StockTick( 5,
-                						 "ACME",
-                						 10,
+                                         "ACME",
+                                         10,
                                          System.currentTimeMillis(),
                                          5 );
         StockTick tick6 = new StockTick( 6,
-										 "ACME",
-										 10,
+                                         "ACME",
+                                         10,
                                          System.currentTimeMillis(),
                                          3 );
         StockTick tick7 = new StockTick( 7,
-				 						 "ACME",
-				 						 10,
+                                         "ACME",
+                                         10,
                                          System.currentTimeMillis(),
                                          5 );
         StockTick tick8 = new StockTick( 8,
-										 "ACME",
-										 10,
+                                         "ACME",
+                                         10,
                                          System.currentTimeMillis(),
                                          3 );
 
@@ -280,7 +284,7 @@
         InternalFactHandle handle7 = (InternalFactHandle) wm.insert( tick7 );
         clock.advanceTime( 2 );
         InternalFactHandle handle8 = (InternalFactHandle) wm.insert( tick8 );
-        
+
         assertNotNull( handle1 );
         assertNotNull( handle2 );
         assertNotNull( handle3 );
@@ -298,78 +302,78 @@
         assertTrue( handle7.isEvent() );
         assertTrue( handle8.isEvent() );
 
-//        wm  = SerializationHelper.serializeObject(wm);
+        //        wm  = SerializationHelper.serializeObject(wm);
         wm.fireAllRules();
 
         assertEquals( 1,
-                results_coincides.size() );
+                      results_coincides.size() );
         assertEquals( tick5,
-                results_coincides.get( 0 ) );
-        
+                      results_coincides.get( 0 ) );
+
         assertEquals( 1,
-                results_before.size() );
+                      results_before.size() );
         assertEquals( tick2,
-                results_before.get( 0 ) );
-        
+                      results_before.get( 0 ) );
+
         assertEquals( 1,
                       results_after.size() );
         assertEquals( tick3,
                       results_after.get( 0 ) );
-        
+
         assertEquals( 1,
-                results_meets.size() );
+                      results_meets.size() );
         assertEquals( tick3,
-                results_meets.get( 0 ) );
-        
+                      results_meets.get( 0 ) );
+
         assertEquals( 1,
                       results_met_by.size() );
         assertEquals( tick2,
                       results_met_by.get( 0 ) );
-        
+
         assertEquals( 1,
-                results_met_by.size() );
+                      results_met_by.size() );
         assertEquals( tick2,
-                results_met_by.get( 0 ) );
-        
+                      results_met_by.get( 0 ) );
+
         assertEquals( 1,
-                results_overlaps.size() );
+                      results_overlaps.size() );
         assertEquals( tick4,
-                results_overlaps.get( 0 ) );
-        
+                      results_overlaps.get( 0 ) );
+
         assertEquals( 1,
-                results_overlapped_by.size() );
+                      results_overlapped_by.size() );
         assertEquals( tick7,
-                results_overlapped_by.get( 0 ) );
-        
+                      results_overlapped_by.get( 0 ) );
+
         assertEquals( 1,
-                results_during.size() );
+                      results_during.size() );
         assertEquals( tick6,
-                results_during.get( 0 ) );
-        
+                      results_during.get( 0 ) );
+
         assertEquals( 1,
-                results_includes.size() );
+                      results_includes.size() );
         assertEquals( tick4,
-                results_includes.get( 0 ) );
-        
+                      results_includes.get( 0 ) );
+
         assertEquals( 1,
-                results_starts.size() );
+                      results_starts.size() );
         assertEquals( tick6,
-                results_starts.get( 0 ) );
-        
+                      results_starts.get( 0 ) );
+
         assertEquals( 1,
-                results_started_by.size() );
+                      results_started_by.size() );
         assertEquals( tick7,
-                results_started_by.get( 0 ) );
-        
+                      results_started_by.get( 0 ) );
+
         assertEquals( 1,
-                results_finishes.size() );
+                      results_finishes.size() );
         assertEquals( tick8,
-                results_finishes.get( 0 ) );
-        
+                      results_finishes.get( 0 ) );
+
         assertEquals( 1,
-                results_finished_by.size() );
+                      results_finished_by.size() );
         assertEquals( tick7,
-                results_finished_by.get( 0 ) );
+                      results_finished_by.get( 0 ) );
 
     }
 
@@ -388,75 +392,107 @@
         // how to configure the clock?
         SessionPseudoClock clock = wm.getSessionClock();
 
-        clock.advanceTime( 10000 ); // 10 seconds
+        clock.advanceTime( 5000 ); // 5 seconds
         EventFactHandle handle1 = (EventFactHandle) wm.insert( new OrderEvent( "1",
                                                                                "customer A",
                                                                                70 ) );
-        assertEquals( 10000,
+        assertEquals( 5000,
                       handle1.getStartTimestamp() );
         assertEquals( 0,
                       handle1.getDuration() );
 
-//        wm  = SerializationHelper.serializeObject(wm);
+        //        wm  = SerializationHelper.serializeObject(wm);
         wm.fireAllRules();
 
+        assertEquals( 1,
+                      results.size() );
+        assertEquals( 70,
+                      ((Number) results.get( 0 )).intValue() );
+
+        // advance clock and assert new data
         clock.advanceTime( 10000 ); // 10 seconds
         EventFactHandle handle2 = (EventFactHandle) wm.insert( new OrderEvent( "2",
                                                                                "customer A",
                                                                                60 ) );
-        assertEquals( 20000,
+        assertEquals( 15000,
                       handle2.getStartTimestamp() );
         assertEquals( 0,
                       handle2.getDuration() );
 
         wm.fireAllRules();
 
+        assertEquals( 2,
+                      results.size() );
+        assertEquals( 65,
+                      ((Number) results.get( 1 )).intValue() );
+
+        // advance clock and assert new data
         clock.advanceTime( 10000 ); // 10 seconds
         EventFactHandle handle3 = (EventFactHandle) wm.insert( new OrderEvent( "3",
                                                                                "customer A",
                                                                                50 ) );
-        assertEquals( 30000,
+        assertEquals( 25000,
                       handle3.getStartTimestamp() );
         assertEquals( 0,
                       handle3.getDuration() );
 
         wm.fireAllRules();
 
+        assertEquals( 3,
+                      results.size() );
+        assertEquals( 60,
+                      ((Number) results.get( 2 )).intValue() );
+
+        // advance clock and assert new data
         clock.advanceTime( 10000 ); // 10 seconds
         EventFactHandle handle4 = (EventFactHandle) wm.insert( new OrderEvent( "4",
                                                                                "customer A",
-                                                                               30 ) );
-        assertEquals( 40000,
+                                                                               25 ) );
+        assertEquals( 35000,
                       handle4.getStartTimestamp() );
         assertEquals( 0,
                       handle4.getDuration() );
 
         wm.fireAllRules();
 
+        // first event should have expired, making average under the rule threshold, so no additional rule fire
+        assertEquals( 3,
+                      results.size() );
+
+        // advance clock and assert new data
         clock.advanceTime( 10000 ); // 10 seconds
         EventFactHandle handle5 = (EventFactHandle) wm.insert( new OrderEvent( "5",
                                                                                "customer A",
                                                                                70 ) );
-        assertEquals( 50000,
+        assertEquals( 45000,
                       handle5.getStartTimestamp() );
         assertEquals( 0,
                       handle5.getDuration() );
 
-//        wm  = SerializationHelper.serializeObject(wm);
+        //        wm  = SerializationHelper.serializeObject(wm);
         wm.fireAllRules();
 
+        // still under the threshold, so no fire
+        assertEquals( 3,
+                      results.size() );
+
+        // advance clock and assert new data
         clock.advanceTime( 10000 ); // 10 seconds
         EventFactHandle handle6 = (EventFactHandle) wm.insert( new OrderEvent( "6",
                                                                                "customer A",
-                                                                               80 ) );
-        assertEquals( 60000,
+                                                                               115 ) );
+        assertEquals( 55000,
                       handle6.getStartTimestamp() );
         assertEquals( 0,
                       handle6.getDuration() );
 
-        wm  = SerializationHelper.serializeObject(wm);
         wm.fireAllRules();
 
+        assertEquals( 4,
+                      results.size() );
+        assertEquals( 70,
+                      ((Number) results.get( 3 )).intValue() );
+
     }
 
     //    public void testTransactionCorrelation() throws Exception {

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/ClockType.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/ClockType.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/ClockType.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -37,7 +37,7 @@
    * client application. It is usually used during simulations or tests
    */
   PSEUDO_CLOCK {
-      public SessionPseudoClock createInstance() {
+      public SessionClock createInstance() {
           return new SessionPseudoClock();
       }
   };

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/AccumulateNode.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -33,7 +33,6 @@
 import org.drools.rule.ContextEntry;
 import org.drools.spi.AlphaNodeFieldConstraint;
 import org.drools.spi.PropagationContext;
-import org.drools.spi.Tuple;
 import org.drools.util.ArrayUtils;
 import org.drools.util.Entry;
 import org.drools.util.Iterator;
@@ -244,8 +243,17 @@
                              final InternalWorkingMemory workingMemory) {
 
         final AccumulateMemory memory = (AccumulateMemory) workingMemory.getNodeMemory( this );
+
         final RightTuple rightTuple = new RightTuple( factHandle,
                                                       this );
+        if ( !behavior.assertRightTuple( memory.behaviorContext,
+                                         rightTuple,
+                                         workingMemory ) ) {
+            // destroy right tuple
+            rightTuple.unlinkFromRightParent();
+            return;
+        }
+
         memory.betaMemory.getRightTupleMemory().add( rightTuple );
 
         if ( !this.tupleMemoryEnabled ) {
@@ -295,6 +303,8 @@
                                   final PropagationContext context,
                                   final InternalWorkingMemory workingMemory) {
         final AccumulateMemory memory = (AccumulateMemory) workingMemory.getNodeMemory( this );
+        
+        behavior.retractRightTuple( memory.behaviorContext, rightTuple, workingMemory );
         memory.betaMemory.getRightTupleMemory().remove( rightTuple );
 
         for ( LeftTuple childTuple = rightTuple.getBetaChildren(); childTuple != null; ) {
@@ -570,6 +580,7 @@
         for ( int i = 0; i < this.resultConstraints.length; i++ ) {
             memory.alphaContexts[i] = this.resultConstraints[i].createContextEntry();
         }
+        memory.behaviorContext = this.behavior.createBehaviorContext();
         return memory;
     }
 
@@ -582,6 +593,7 @@
         public BetaMemory         betaMemory;
         public ContextEntry[]     resultsContext;
         public ContextEntry[]     alphaContexts;
+        public Object             behaviorContext;
 
         public void readExternal(ObjectInput in) throws IOException,
                                                 ClassNotFoundException {
@@ -589,6 +601,7 @@
             betaMemory = (BetaMemory) in.readObject();
             resultsContext = (ContextEntry[]) in.readObject();
             alphaContexts = (ContextEntry[]) in.readObject();
+            behaviorContext = in.readObject();
         }
 
         public void writeExternal(ObjectOutput out) throws IOException {
@@ -596,6 +609,7 @@
             out.writeObject( betaMemory );
             out.writeObject( resultsContext );
             out.writeObject( alphaContexts );
+            out.writeObject( behaviorContext );
         }
 
     }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/BetaNode.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -29,6 +29,7 @@
 import org.drools.common.NodeMemory;
 import org.drools.common.PropagationContextImpl;
 import org.drools.rule.Behavior;
+import org.drools.rule.BehaviorManager;
 import org.drools.spi.BetaNodeFieldConstraint;
 import org.drools.spi.PropagationContext;
 import org.drools.util.LinkedList;
@@ -53,8 +54,6 @@
     RightTupleSink,
     NodeMemory {
     
-    public static final Behavior[] NO_BEHAVIORS = new Behavior[0];
-    
     // ------------------------------------------------------------
     // Instance members
     // ------------------------------------------------------------
@@ -67,7 +66,7 @@
 
     protected BetaConstraints constraints;
 
-    protected Behavior[]      behaviors = NO_BEHAVIORS;
+    protected BehaviorManager behavior;
 
     private LeftTupleSinkNode previousTupleSinkNode;
     private LeftTupleSinkNode nextTupleSinkNode;
@@ -102,7 +101,7 @@
         this.leftInput = leftInput;
         this.rightInput = rightInput;
         this.constraints = constraints;
-        this.behaviors = behaviors;
+        this.behavior = new BehaviorManager( behaviors );
 
         if ( this.constraints == null ) {
             throw new RuntimeException( "cannot have null constraints, must at least be an instance of EmptyBetaConstraints" );
@@ -112,7 +111,7 @@
     public void readExternal(ObjectInput in) throws IOException,
                                             ClassNotFoundException {
         constraints = (BetaConstraints) in.readObject();
-        behaviors = (Behavior[]) in.readObject();
+        behavior = (BehaviorManager) in.readObject();
         leftInput = (LeftTupleSource) in.readObject();
         rightInput = (ObjectSource) in.readObject();
         previousTupleSinkNode = (LeftTupleSinkNode) in.readObject();
@@ -126,7 +125,7 @@
 
     public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject( constraints );
-        out.writeObject( behaviors );
+        out.writeObject( behavior );
         out.writeObject( leftInput );
         out.writeObject( rightInput );
         out.writeObject( previousTupleSinkNode );
@@ -150,7 +149,7 @@
     }
     
     public Behavior[] getBehaviors() {
-        return this.behaviors;
+        return this.behavior.getBehaviors();
     }
 
     /* (non-Javadoc)
@@ -362,5 +361,5 @@
     public void setPreviousObjectSinkNode(final ObjectSinkNode previous) {
         this.previousObjectSinkNode = previous;
     }
-
+    
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooTemporalSession.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/ReteooTemporalSession.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -50,6 +50,7 @@
                ruleBase,
                executorService );
         this.sessionClock = clock;
+        this.sessionClock.setSession( this );
     }
 
     public void readExternal(ObjectInput in) throws IOException,

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightTuple.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightTuple.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/RightTuple.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -13,24 +13,24 @@
     Entry {
     private InternalFactHandle handle;
 
-    private RightTuple               handlePrevious;
-    private RightTuple               handleNext;
+    private RightTuple         handlePrevious;
+    private RightTuple         handleNext;
 
-    private RightTupleList           memory;
+    private RightTupleList     memory;
 
-    private Entry                    previous;
-    private Entry                    next;
+    private Entry              previous;
+    private Entry              next;
 
-    private LeftTuple                betaChildren;
+    private LeftTuple          betaChildren;
 
-    private LeftTuple                blocked;
+    private LeftTuple          blocked;
 
-    private RightTupleSink           sink;
+    private RightTupleSink     sink;
 
-    private int                      hashCode;
-    
+    private int                hashCode;
+
     public RightTuple() {
-        
+
     }
 
     public RightTuple(InternalFactHandle handle,
@@ -52,25 +52,28 @@
         return this.sink;
     }
 
-    //    public void unlinkFromRightParent() {
-    //        if ( this.parent != null ) {
-    //            if ( this.parentPrevious != null ) {
-    //                this.parentPrevious.parentNext = this.parentNext;
-    //            } else {
-    //                // first one in the chain, so treat differently                
-    //                this.parent.setAlphaChildren( this.parentNext );
-    //            }
-    //
-    //            if ( this.parentNext != null ) {
-    //                this.parentNext.parentPrevious = this.parentPrevious;
-    //            }
-    //        }
-    //
-    //        this.parent = null;
-    //        this.parentPrevious = null;
-    //        this.parentNext = null;
-    //        this.blocked = null;
-    //    }
+    public void unlinkFromRightParent() {
+        if ( this.handle != null ) {
+            if( this.handlePrevious != null ) {
+                this.handlePrevious.handleNext = this.handleNext;
+            }
+            if( this.handleNext != null ) {
+                this.handleNext.handlePrevious = this.handlePrevious;
+            }
+            if( this.handle.getRightTuple() == this ) {
+                this.handle.setRightTuple( this.handleNext );
+            }
+        }
+        this.handle = null;
+        this.handlePrevious = null;
+        this.handleNext = null;
+        this.blocked = null;
+        this.previous = null;
+        this.next = null;
+        this.memory = null;
+        this.betaChildren = null;
+        this.sink = null;
+    }
 
     public InternalFactHandle getFactHandle() {
         return this.handle;
@@ -87,7 +90,7 @@
     public RightTupleList getMemory() {
         return memory;
     }
-    
+
     public void setMemory(RightTupleList memory) {
         this.memory = memory;
     }
@@ -168,15 +171,15 @@
 
     public void readExternal(ObjectInput in) throws IOException,
                                             ClassNotFoundException {
-        this.handle = ( InternalFactHandle ) in.readObject();
-        this.handlePrevious = ( RightTuple ) in.readObject();
-        this.handleNext = ( RightTuple ) in.readObject();
-        this.memory = ( RightTupleList ) in.readObject();
-        this.previous = ( RightTuple ) in.readObject();
-        this.next = ( RightTuple ) in.readObject();
-        this.betaChildren = ( LeftTuple) in.readObject();
-        this.blocked = ( LeftTuple) in.readObject();
-        this.sink = ( RightTupleSink ) in.readObject();
+        this.handle = (InternalFactHandle) in.readObject();
+        this.handlePrevious = (RightTuple) in.readObject();
+        this.handleNext = (RightTuple) in.readObject();
+        this.memory = (RightTupleList) in.readObject();
+        this.previous = (RightTuple) in.readObject();
+        this.next = (RightTuple) in.readObject();
+        this.betaChildren = (LeftTuple) in.readObject();
+        this.blocked = (LeftTuple) in.readObject();
+        this.sink = (RightTupleSink) in.readObject();
         this.hashCode = in.readInt();
     }
 
@@ -190,7 +193,7 @@
         out.writeObject( this.betaChildren );
         out.writeObject( this.blocked );
         out.writeObject( this.sink );
-        out.writeInt( this.hashCode );        
+        out.writeInt( this.hashCode );
     }
 
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/AccumulateBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/AccumulateBuilder.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/AccumulateBuilder.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -49,6 +49,7 @@
 
         final List resultBetaConstraints = context.getBetaconstraints();
         final List resultAlphaConstraints = context.getAlphaConstraints();
+        final List resultBehaviors = context.getBehaviors();
 
         final RuleConditionElement source = accumulate.getSource();
 
@@ -90,6 +91,11 @@
         final BetaConstraints sourceBinder = utils.createBetaNodeConstraint( context,
                                                                              context.getBetaconstraints(),
                                                                              false );
+        
+        Behavior[] behaviors = Behavior.EMPTY_BEHAVIOR_LIST;
+        if( ! context.getBehaviors().isEmpty() ) {
+            behaviors = (Behavior[]) context.getBehaviors().toArray( new Behavior[ context.getBehaviors().size() ]);
+        }
 
         context.setTupleSource( (LeftTupleSource) utils.attachNode( context,
                                                                 new AccumulateNode( context.getNextId(),
@@ -98,7 +104,7 @@
                                                                                     (AlphaNodeFieldConstraint[]) resultAlphaConstraints.toArray( new AlphaNodeFieldConstraint[resultAlphaConstraints.size()] ),
                                                                                     sourceBinder,
                                                                                     resultsBinder,
-                                                                                    Behavior.EMPTY_BEHAVIOR_LIST,
+                                                                                    behaviors,
                                                                                     accumulate,
                                                                                     existSubNetwort,
                                                                                     context ) ) );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/BuildContext.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -69,6 +69,9 @@
     // alpha constraints from the last pattern attached
     private List                      alphaConstraints;
     
+    // behaviors from the last pattern attached
+    private List                      behaviors;
+    
     // the current entry point
     private EntryPoint                currentEntryPoint;
     
@@ -348,5 +351,19 @@
     public void setCurrentEntryPoint(EntryPoint currentEntryPoint) {
         this.currentEntryPoint = currentEntryPoint;
     }
+
+    /**
+     * @return the behaviors
+     */
+    public List getBehaviors() {
+        return behaviors;
+    }
+
+    /**
+     * @param behaviors the behaviors to set
+     */
+    public void setBehaviors(List behaviors) {
+        this.behaviors = behaviors;
+    }
         
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/PatternBuilder.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/PatternBuilder.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/reteoo/builder/PatternBuilder.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -33,14 +33,12 @@
 import org.drools.rule.Declaration;
 import org.drools.rule.EntryPoint;
 import org.drools.rule.InvalidPatternException;
-import org.drools.rule.MutableTypeConstraint;
 import org.drools.rule.Pattern;
 import org.drools.rule.PatternSource;
 import org.drools.rule.RuleConditionElement;
 import org.drools.spi.AlphaNodeFieldConstraint;
 import org.drools.spi.Constraint;
 import org.drools.spi.ObjectType;
-import org.drools.spi.Constraint.ConstraintType;
 
 /**
  * A builder for patterns
@@ -75,6 +73,7 @@
 
         final List alphaConstraints = new LinkedList();
         final List betaConstraints = new LinkedList();
+        final List behaviors = new LinkedList();
 
         this.createConstraints( context,
                                 utils,
@@ -84,6 +83,10 @@
 
         // Create BetaConstraints object
         context.setBetaconstraints( betaConstraints );
+        
+        // set behaviors list
+        behaviors.addAll( pattern.getBehaviors() );
+        context.setBehaviors( behaviors );
 
         if ( pattern.getSource() != null ) {
             context.setAlphaConstraints( alphaConstraints );

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/Behavior.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/Behavior.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/Behavior.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -18,6 +18,8 @@
 
 package org.drools.rule;
 
+import org.drools.common.InternalWorkingMemory;
+import org.drools.reteoo.RightTuple;
 import org.drools.spi.RuleComponent;
 
 /**
@@ -45,6 +47,50 @@
         }
     }
     
+    /**
+     * Returns the type of the behavior
+     * 
+     * @return
+     */
     public BehaviorType getType();
 
+    /**
+     * Creates the context object associated with this behavior.
+     * The object is given as a parameter in all behavior call backs.
+     * 
+     * @return
+     */
+    public Object createContext();
+
+    /**
+     * Makes the behavior aware of the new fact entering behavior's scope
+     * 
+     * @param context The behavior context object
+     * @param tuple The new fact entering behavior's scope
+     * @param workingMemory The working memory session reference
+     */
+    public void assertRightTuple(Object context,
+                                 RightTuple tuple,
+                                 InternalWorkingMemory workingMemory);
+
+    /**
+     * Removes a right tuple from the behavior's scope
+     * 
+     * @param context The behavior context object
+     * @param rightTuple The tuple leaving the behavior's scope
+     * @param workingMemory The working memory session reference
+     */
+    public void retractRightTuple(Object context,
+                                  RightTuple rightTuple,
+                                  InternalWorkingMemory workingMemory);
+
+    /**
+     * A callback method that allows behaviors to expire tuples
+     * 
+     * @param context The behavior context object
+     * @param workingMemory The working memory session reference
+     */
+    public void expireTuples(Object context, 
+                             InternalWorkingMemory workingMemory);
+    
 }

Added: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/BehaviorManager.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/BehaviorManager.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/BehaviorManager.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2008 JBoss Inc
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Created on May 12, 2008
+ */
+
+package org.drools.rule;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.drools.common.InternalWorkingMemory;
+import org.drools.reteoo.RightTuple;
+
+/**
+ * A class to encapsulate behavior management for a given beta node
+ * 
+ * @author etirelli
+ */
+public class BehaviorManager
+    implements
+    Externalizable {
+
+    public static final Behavior[] NO_BEHAVIORS = new Behavior[0];
+
+    private Behavior[]             behaviors;
+
+    public BehaviorManager() {
+        this( NO_BEHAVIORS );
+    }
+    
+    /**
+     * @param behaviors
+     */
+    public BehaviorManager(Behavior[] behaviors) {
+        super();
+        this.behaviors = behaviors;
+    }
+
+    public void readExternal(ObjectInput in) throws IOException,
+                                            ClassNotFoundException {
+        behaviors = (Behavior[]) in.readObject();
+    }
+
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject( behaviors );
+    }
+
+    /**
+     * Creates the behaviors' context 
+     * 
+     * @return
+     */
+    public Object createBehaviorContext() {
+        Object[] behaviorCtx = new Object[behaviors.length];
+        for ( int i = 0; i < behaviors.length; i++ ) {
+            behaviorCtx[i] = behaviors[i].createContext();
+        }
+        return behaviorCtx;
+    }
+
+    /**
+     * Register a newly asserted right tuple into the behaviors' context
+     *  
+     * @param context
+     * @param tuple
+     * @return
+     */
+    public boolean assertRightTuple(final Object behaviorContext,
+                                    final RightTuple rightTuple,
+                                    final InternalWorkingMemory workingMemory) {
+        for ( int i = 0; i < behaviors.length; i++ ) {
+            behaviors[i].assertRightTuple( ((Object[]) behaviorContext)[i],
+                                           rightTuple,
+                                           workingMemory );
+        }
+        return true;
+    }
+
+    /**
+     * Removes a newly asserted right tuple from the behaviors' context
+     * @param behaviorContext
+     * @param rightTuple
+     * @param workingMemory
+     */
+    public void retractRightTuple(final Object behaviorContext,
+                                  final RightTuple rightTuple,
+                                  final InternalWorkingMemory workingMemory) {
+        for ( int i = 0; i < behaviors.length; i++ ) {
+            behaviors[i].retractRightTuple( ((Object[]) behaviorContext)[i],
+                                            rightTuple,
+                                            workingMemory );
+        }
+    }
+
+    /**
+     * @return the behaviors
+     */
+    public Behavior[] getBehaviors() {
+        return behaviors;
+    }
+
+}

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/rule/SlidingTimeWindow.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -22,9 +22,16 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.Comparator;
+import java.util.PriorityQueue;
 
+import org.drools.TemporalSession;
 import org.drools.common.EventFactHandle;
 import org.drools.common.InternalWorkingMemory;
+import org.drools.common.PropagationContextImpl;
+import org.drools.reteoo.ReteooTemporalSession;
+import org.drools.reteoo.RightTuple;
+import org.drools.spi.PropagationContext;
+import org.drools.temporal.SessionClock;
 
 /**
  * @author etirelli
@@ -35,14 +42,16 @@
     Behavior {
 
     private long size;
+    private volatile transient RightTuple expiringTuple; 
 
     public SlidingTimeWindow() {
+        this( 0 );
     }
-    
+
     /**
      * @param size
      */
-    public SlidingTimeWindow(long size) {
+    public SlidingTimeWindow(final long size) {
         super();
         this.size = size;
     }
@@ -52,10 +61,9 @@
      *
      * @see java.io.Externalizable#readExternal(java.io.ObjectInput)
      */
-    public void readExternal(ObjectInput in) throws IOException,
-                                            ClassNotFoundException {
-        // TODO Auto-generated method stub
-
+    public void readExternal(final ObjectInput in) throws IOException,
+                                                  ClassNotFoundException {
+        this.size = in.readLong();
     }
 
     /**
@@ -63,9 +71,8 @@
      *
      * @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
      */
-    public void writeExternal(ObjectOutput out) throws IOException {
-        // TODO Auto-generated method stub
-
+    public void writeExternal(final ObjectOutput out) throws IOException {
+        out.writeLong( this.size );
     }
 
     public BehaviorType getType() {
@@ -82,22 +89,126 @@
     /**
      * @param size the size to set
      */
-    public void setSize(long size) {
+    public void setSize(final long size) {
         this.size = size;
     }
 
-    public Comparator<EventFactHandle> getEventComparator() {
-        return new Comparator<EventFactHandle>() {
-            public int compare(EventFactHandle e1,
-                               EventFactHandle e2) {
-                return ( e1.getStartTimestamp() < e2.getStartTimestamp() ) ? -1 : ( e1.getStartTimestamp() == e2.getStartTimestamp() ? 0 : 1 );
+    public Object createContext() {
+        return new PriorityQueue<RightTuple>( 16, // arbitrary size... can we improve it?
+                                              new SlidingTimeWindowComparator() );
+    }
+
+    /**
+     * @inheritDoc
+     *
+     * @see org.drools.rule.Behavior#assertRightTuple(java.lang.Object, org.drools.reteoo.RightTuple, org.drools.common.InternalWorkingMemory)
+     */
+    public void assertRightTuple(final Object context,
+                                 final RightTuple rightTuple,
+                                 final InternalWorkingMemory workingMemory) {
+        PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
+        queue.add( rightTuple );
+        if ( queue.peek() == rightTuple ) {
+            // update next expiration time 
+            updateNextExpiration( rightTuple,
+                                  workingMemory,
+                                  queue );
+        }
+    }
+
+    /**
+     * @inheritDoc
+     *
+     * @see org.drools.rule.Behavior#retractRightTuple(java.lang.Object, org.drools.reteoo.RightTuple, org.drools.common.InternalWorkingMemory)
+     */
+    public void retractRightTuple(final Object context,
+                                  final RightTuple rightTuple,
+                                  final InternalWorkingMemory workingMemory) {
+        // it may be a call back to expire the tuple that is already being expired
+        if( this.expiringTuple != rightTuple ) {
+            PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
+            if ( queue.peek() == rightTuple ) {
+                // it was the head of the queue
+                queue.poll();
+                // update next expiration time 
+                updateNextExpiration( queue.peek(),
+                                      workingMemory,
+                                      queue );
+            } else {
+                queue.remove( rightTuple );
             }
-        };
+        }
     }
 
-    public boolean isExpired(EventFactHandle event,
-                             InternalWorkingMemory workingMemory) {
-        return false;
+    public void expireTuples(final Object context,
+                             final InternalWorkingMemory workingMemory) {
+        SessionClock clock = ((TemporalSession<SessionClock>) workingMemory).getSessionClock();
+        long currentTime = clock.getCurrentTime();
+        PriorityQueue<RightTuple> queue = (PriorityQueue<RightTuple>) context;
+        RightTuple tuple = queue.peek();
+        while ( tuple != null && isExpired( currentTime,
+                                            tuple ) ) {
+            this.expiringTuple = tuple;
+            queue.remove();
+            final PropagationContext propagationContext = new PropagationContextImpl( ((ReteooTemporalSession<SessionClock>) workingMemory).getNextPropagationIdCounter(),
+                                                                                      PropagationContext.RETRACTION,
+                                                                                      null,
+                                                                                      null,
+                                                                                      tuple.getFactHandle() );
+            tuple.getRightTupleSink().retractRightTuple( tuple,
+                                                         propagationContext,
+                                                         workingMemory );
+            tuple.unlinkFromRightParent();
+            this.expiringTuple = null;
+            tuple = queue.peek();
+        }
+        
+        // update next expiration time 
+        updateNextExpiration( tuple,
+                              workingMemory,
+                              queue );
     }
 
+    private boolean isExpired(final long currentTime,
+                              final RightTuple rightTuple) {
+        return ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size <= currentTime;
+    }
+
+    /**
+     * @param rightTuple
+     * @param workingMemory
+     */
+    private void updateNextExpiration(final RightTuple rightTuple,
+                                      final InternalWorkingMemory workingMemory,
+                                      final Object context) {
+        SessionClock clock = ((TemporalSession<SessionClock>) workingMemory).getSessionClock();
+        if ( rightTuple != null ) {
+            clock.schedule( this,
+                            context,
+                            ((EventFactHandle) rightTuple.getFactHandle()).getStartTimestamp() + this.size );
+        } else {
+            clock.unschedule( this );
+        }
+    }
+    
+    public String toString() {
+        return "SlidingTimeWindow( size="+size+" )";
+    }
+
+    /**
+     * A Comparator<RightTuple> implementation for the fact queue
+     * 
+     * @author etirelli
+     */
+    private static class SlidingTimeWindowComparator
+        implements
+        Comparator<RightTuple> {
+        public int compare(RightTuple t1,
+                           RightTuple t2) {
+            final EventFactHandle e1 = (EventFactHandle) t1.getFactHandle();
+            final EventFactHandle e2 = (EventFactHandle) t2.getFactHandle();
+            return (e1.getStartTimestamp() < e2.getStartTimestamp()) ? -1 : (e1.getStartTimestamp() == e2.getStartTimestamp() ? 0 : 1);
+        }
+    }
+
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionClock.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionClock.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionClock.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -19,12 +19,17 @@
 
 import java.io.Externalizable;
 
+import org.drools.TemporalSession;
+import org.drools.rule.Behavior;
+
 /**
  * A clock interface that all engine clocks must implement
  * 
  * @author etirelli
  */
-public interface SessionClock extends Externalizable {
+public interface SessionClock
+    extends
+    Externalizable {
 
     /**
      * Returns the current time. There is no semantics attached
@@ -38,4 +43,33 @@
      */
     public long getCurrentTime();
 
+    /**
+     * Schedule a call back to the given behavior at the given
+     * timestamp. If a callback was already registered for the
+     * given behavior, update the existing record to the new
+     * timestamp.
+     * 
+     * @param behavior
+     * @param timestamp
+     */
+    public void schedule(Behavior behavior,
+                         Object behaviorContext,
+                         long timestamp);
+
+    /**
+     * Unschedule any existing call back for the given behavior
+     * 
+     * @param behavior
+     */
+    public void unschedule(Behavior behavior);
+
+    /**
+     * Sets the temporal session associated with this clock
+     * 
+     * @param session the session to set
+     */
+    public void setSession(TemporalSession<? extends SessionClock> session);
+    
+    
+
 }

Modified: labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionPseudoClock.java
===================================================================
--- labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionPseudoClock.java	2008-05-13 16:44:58 UTC (rev 19941)
+++ labs/jbossrules/trunk/drools-core/src/main/java/org/drools/temporal/SessionPseudoClock.java	2008-05-13 19:55:55 UTC (rev 19942)
@@ -17,10 +17,20 @@
  */
 package org.drools.temporal;
 
+import java.io.IOException;
 import java.io.ObjectInput;
-import java.io.IOException;
 import java.io.ObjectOutput;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
 
+import org.drools.TemporalSession;
+import org.drools.common.DroolsObjectInputStream;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.common.WorkingMemoryAction;
+import org.drools.reteoo.ReteooTemporalSession;
+import org.drools.rule.Behavior;
+
 /**
  * A SessionPseudoClock is a clock that allows the user to explicitly 
  * control current time.
@@ -32,34 +42,166 @@
     implements
     SessionClock {
 
-    private long timer;
+    private long                                          timer;
+    private PriorityQueue<ScheduledItem>                  queue;
+    private transient Map<Behavior, ScheduledItem>        schedules;
+    private transient ReteooTemporalSession<SessionClock> session;
 
     public SessionPseudoClock() {
+        this( null );
+    }
+
+    public SessionPseudoClock(TemporalSession<SessionClock> session) {
         this.timer = 0;
+        this.queue = new PriorityQueue<ScheduledItem>();
+        this.schedules = new HashMap<Behavior, ScheduledItem>();
+        this.session = (ReteooTemporalSession<SessionClock>) session;
     }
 
-    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        timer   = in.readLong();
+    public void readExternal(ObjectInput in) throws IOException,
+                                            ClassNotFoundException {
+        timer = in.readLong();
+        PriorityQueue<ScheduledItem> tmp = (PriorityQueue<ScheduledItem>) in.readObject();
+        if( tmp != null ) {
+            queue = tmp;
+            for ( ScheduledItem item : queue ) {
+                this.schedules.put( item.getBehavior(),
+                                    item );
+            }
+        }
+        session = (ReteooTemporalSession<SessionClock>) ((DroolsObjectInputStream) in).getWorkingMemory();
     }
 
     public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(timer);
+        out.writeLong( timer );
+        // this is a work around to a bug in the object stream code, where it raises exceptions
+        // when trying to deserialize an empty priority queue.
+        out.writeObject( queue.isEmpty() ? null : queue );
     }
 
     /* (non-Javadoc)
     * @see org.drools.temporal.SessionClock#getCurrentTime()
     */
-    public long getCurrentTime() {
+    public synchronized long getCurrentTime() {
         return this.timer;
     }
 
-    public long advanceTime( long millisecs ) {
+    public synchronized long advanceTime(long millisecs) {
         this.timer += millisecs;
+        this.runCallBacks();
         return this.timer;
     }
 
-    public void setStartupTime(int i) {
+    public synchronized void setStartupTime(int i) {
         this.timer = i;
     }
 
+    public synchronized void schedule(final Behavior behavior,
+                                      final Object behaviorContext,
+                                      final long timestamp) {
+        ScheduledItem item = schedules.remove( behavior );
+        if ( item != null ) {
+            queue.remove( item );
+        }
+        item = new ScheduledItem( timestamp,
+                                  behavior,
+                                  behaviorContext );
+        schedules.put( behavior,
+                       item );
+        queue.add( item );
+    }
+
+    public synchronized void unschedule(final Behavior behavior) {
+        ScheduledItem item = schedules.remove( behavior );
+        if ( item != null ) {
+            queue.remove( item );
+        }
+    }
+
+    /**
+     * @return the session
+     */
+    public synchronized TemporalSession<SessionClock> getSession() {
+        return session;
+    }
+
+    /**
+     * @param session the session to set
+     */
+    public synchronized void setSession(TemporalSession<? extends SessionClock> session) {
+        this.session = (ReteooTemporalSession<SessionClock>) session;
+    }
+
+    private void runCallBacks() {
+        ScheduledItem item = queue.peek();
+        while ( item != null && item.getTimestamp() <= this.timer ) {
+            // remove the head
+            queue.remove();
+            // enqueue the callback
+            session.queueWorkingMemoryAction( item );
+            // get next head
+            item = queue.peek();
+        }
+    }
+
+    private static final class ScheduledItem
+        implements
+        Comparable<ScheduledItem>,
+        WorkingMemoryAction {
+        private long     timestamp;
+        private Behavior behavior;
+        private Object   behaviorContext;
+
+        /**
+         * @param timestamp
+         * @param behavior
+         * @param behaviorContext 
+         */
+        public ScheduledItem(final long timestamp,
+                             final Behavior behavior, 
+                             final Object behaviorContext) {
+            super();
+            this.timestamp = timestamp;
+            this.behavior = behavior;
+            this.behaviorContext = behaviorContext;
+        }
+
+        /**
+         * @return the timestamp
+         */
+        public final long getTimestamp() {
+            return timestamp;
+        }
+
+        /**
+         * @return the behavior
+         */
+        public final Behavior getBehavior() {
+            return behavior;
+        }
+
+        public int compareTo(ScheduledItem o) {
+            return this.timestamp < o.getTimestamp() ? -1 : this.timestamp == o.getTimestamp() ? 0 : 1;
+        }
+
+        public void execute(final InternalWorkingMemory workingMemory) {
+            behavior.expireTuples( behaviorContext, workingMemory );
+        }
+
+        public void readExternal(ObjectInput in) throws IOException,
+                                                ClassNotFoundException {
+            timestamp = in.readLong();
+            behavior = (Behavior) in.readObject();
+        }
+
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong( timestamp );
+            out.writeObject( behavior );
+        }
+        
+        public String toString() {
+            return "ScheduledItem( timestamp="+timestamp+", behavior="+behavior+" )";
+        }
+    }
+
 }




More information about the jboss-svn-commits mailing list