[jboss-svn-commits] JBL Code SVN: r19668 - in labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools: reteoo and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Apr 21 09:00:11 EDT 2008


Author: mgroch
Date: 2008-04-21 09:00:10 -0400 (Mon, 21 Apr 2008)
New Revision: 19668

Added:
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java
Modified:
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java
   labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java
Log:
modifications from ITA to NTA

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -29,8 +29,8 @@
 
     private static final long serialVersionUID = 8400185220119865618L;
 
-    private int    		 type;
-    
+    private final int    type;
+
     private Rule         rule;
 
     private Activation   activation;
@@ -168,10 +168,10 @@
     public void setEntryPoint(EntryPoint entryPoint) {
         this.entryPoint = entryPoint;
     }
-
+    
     /**
      * @param clone the object and set new type
-     */
+     *//*
     public PropagationContext clone(final int myType) {
         return new PropagationContextImpl(propagationNumber,
         								  myType,
@@ -180,9 +180,10 @@
         								  activeActivations,
         								  dormantActivations,
         								  entryPoint);
-    }
+    }*/
 
-    public void setType (int type){
+    /*    public void setType (int type){
     	this.type = type;
-    }
+    }*/
+
 }

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -25,9 +25,12 @@
     private AssertBehaviour                        behaviour;
     private Lock                                   lock;
     
+    private final Logger myLogger = Logger.getLogger(SingleThreadedObjectStore.class.getName());
+    
     public SingleThreadedObjectStore(RuleBaseConfiguration conf, Lock lock) {
         this.behaviour = conf.getAssertBehaviour();
-        this.lock = lock;
+        //this.lock = lock;
+        this.lock = new ReentrantLock();
         
         this.assertMap = new ObjectHashMap();            
 
@@ -59,8 +62,6 @@
      * @see org.drools.common.ObjectStore#getObjectForHandle(org.drools.common.InternalFactHandle)
      */
     public Object getObjectForHandle(InternalFactHandle handle) {
-        //Logger myLogger = Logger.getLogger(SingleThreadedObjectStore.class.getName());
-        //this.lock = new ReentrantLock();
     	try {
         	//myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER getObjectForHandle("+handle.getObject().toString()+")");
             this.lock.lock(); 
@@ -80,8 +81,9 @@
 
             return object;
         } finally {
+        	//myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO LEAVE getObjectForHandle("+handle.getObject().toString()+")");
             this.lock.unlock(); 
-            //myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING getObjectForHandle("+handle.getObject().toString()+")");
+            //myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEFT getObjectForHandle("+handle.getObject().toString()+")");
         }            
     }
             

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -48,6 +48,7 @@
 import org.drools.spi.ExecutorServiceFactory;
 import org.drools.spi.FactHandleFactory;
 import org.drools.spi.PropagationContext;
+import org.drools.temporal.ExpirationChecker;
 import org.drools.temporal.GlobalSessionClockImpl;
 
 /**
@@ -293,6 +294,9 @@
                                                                                  true,
                                                                                  null,
                                                                                  null ) );
+            // hand wm to expiration checker and start loop
+            if (clockType != null)
+            	ExpirationChecker.createInstance(session);
         }
 
         return session;

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -17,9 +17,13 @@
  */
 package org.drools.reteoo;
 
+import java.util.Observer;
+import java.util.Set;
+
 import org.drools.TemporalSession;
 import org.drools.common.InternalRuleBase;
 import org.drools.concurrent.ExecutorService;
+import org.drools.temporal.AbstractSessionClock;
 import org.drools.temporal.SessionClock;
 
 /**

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -17,13 +17,11 @@
 package org.drools.reteoo;
 
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.log4j.Logger;
 import org.drools.RuleBaseConfiguration;
@@ -37,9 +35,8 @@
 import org.drools.rule.TimeWindow;
 import org.drools.spi.AlphaNodeFieldConstraint;
 import org.drools.spi.PropagationContext;
-import org.drools.temporal.AbstractSessionClock;
-import org.drools.temporal.EventExpiration;
 import org.drools.temporal.GlobalSessionClockImpl;
+import org.drools.temporal.TimeWindowNodeExpirationManager;
 import org.drools.util.ArrayUtils;
 import org.drools.util.Entry;
 import org.drools.util.FactEntry;
@@ -52,10 +49,9 @@
  *
  * @author mgroch
  */
-public class TimeWindowNode extends BetaNode
-	/*implements Observer*/{
+public class TimeWindowNode extends BetaNode{
 	
-	Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
+	private final Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
     
 	private static final long                serialVersionUID = 400L;
 
@@ -65,12 +61,11 @@
     private final BetaConstraints            resultBinder;
     private final long						 windowSize;
     private long						     windowStart, windowEnd;
+    private final ReentrantLock 			 lock;
+    private final TimeWindowNodeExpirationManager expirationManager;
     private final PropagationContext         DEFAULT_RETRACTION_CONTEXT = new PropagationContextImpl(0, PropagationContext.RETRACTION, null, null);;
-    private final LinkedList<EventExpiration<?>> orderedExpirationList;
-    private Set<InternalFactHandle> eventObjectsToRetract;
-	private Set<ReteTuple> eventTuplesToRetract;
-	private final EventExpirationComparator evexComparator;
 
+
     public TimeWindowNode(final int id,
                           final TupleSource leftInput,
                           final ObjectSource rightInput,
@@ -92,20 +87,16 @@
         this.windowSize = timeWindow.getWindowSize();
         this.windowStart = 0;
         this.windowEnd = 0;
-        this.orderedExpirationList = new LinkedList<EventExpiration<?>>();
-    	this.evexComparator = new EventExpirationComparator();
-    	this.eventObjectsToRetract = new HashSet<InternalFactHandle>();
-    	this.eventTuplesToRetract = new HashSet<ReteTuple>();
-
+        this.expirationManager = new TimeWindowNodeExpirationManager(this);
+        this.lock = new ReentrantLock();
     }
-    
-    // original method signature like it is invoked by original Rete when inserting a new event into wm (no modification)
-    public void assertTuple(final ReteTuple leftTuple,
-            final PropagationContext context,
-            final InternalWorkingMemory workingMemory){
-    	
-    	this.assertTuple(leftTuple, context, workingMemory, false);
-    }
+	
+/*	public void assertTuple(final ReteTuple leftTuple,
+             final PropagationContext context,
+             final InternalWorkingMemory workingMemory) {
+		// original method signature 
+		this.assertTuple(leftTuple, context, workingMemory, false);
+	}*/
 
     /**
      * @inheritDoc
@@ -128,145 +119,151 @@
      */
     public void assertTuple(final ReteTuple leftTuple,
                             final PropagationContext context,
-                            final InternalWorkingMemory workingMemory,
-                            final boolean isPartOfUpdate) {
+                            final InternalWorkingMemory workingMemory/*,
+                            final boolean isInvokedByExpirationChecker*/) {
        
-        /*try {
-    	
-        	timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-        	lock.lock(); timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-        */
-    	
-    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+        try {
+            	
+        	myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": TRYING TO ENTER assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+        	lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ENTERING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
         	
-		// update window bounds according to current session clock value
-		long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
-		if (!isPartOfUpdate && this.updateWindowBounds(currentTime))
-			this.updateNodeMemory(currentTime, workingMemory);
+        	// update window bounds according to current session clock value
+        	this.updateWindowBounds(GlobalSessionClockImpl.getInstance().getCurrentTime());
+	        // remove facts and tuples being outside window bounds from items to be considered for aggregation
+	        //updateNodeMemory(memory, context, workingMemory);
         
-        TimeWindowResult twresult = new TimeWindowResult();
-
-        //InternalFactHandle lastHandle = leftTuple.getLastHandle();
-    	boolean meetsWindowConstraints = true;
-    	long eventEnd;// = 0;
-    	
-        if ( this.tupleMemoryEnabled ) {
-        	      	
-        	if (leftTuple.containsEvents()){
-        	
-	            try {
-	    			eventEnd = leftTuple.getCreationTimeFirstContributingEvent();
-	    		} catch (ClassCastException e) {
-	    			System.err.println ("Unable to retrieve timestamp - Expected event:\n");
-	    			e.printStackTrace();
-	    			eventEnd = Long.MIN_VALUE;
-	    		}
-	            
-	    		// add tuple to time-ordered queue of tuples only if it meets window constraints
-	            if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
-	            	meetsWindowConstraints = false;
-	            // add negative event only if tuple contains an event and meets window constraints
-	            else addExpiration(new EventExpiration<ReteTuple>(leftTuple, true, eventEnd+windowSize));
-
-        	}
-        	
-        	if (meetsWindowConstraints){
-        		memory.betaMemory.getTupleMemory().add( leftTuple );
-        		memory.betaMemory.getCreatedHandles().put( leftTuple,
-                                            twresult,
-                                            false );
-        	}
-        }
-
-        if (meetsWindowConstraints){
-        	
-        	//timeWindowLogger.debug("meetswindowConstraints");
-        	
-	        final Object twContext = this.timeWindow.createContext();
-	
-	        twresult.context = twContext;
-	        this.timeWindow.init( memory.workingMemoryContext,
-	                              twContext,
-	                              leftTuple,
-	                              workingMemory );
-	
-	        final Iterator it = memory.betaMemory.getFactHandleMemory().iterator( leftTuple );
-	        this.constraints.updateFromTuple( workingMemory,
-	                                          leftTuple );
-	
-	        for ( FactEntry entry = (FactEntry) it.next(); entry != null; entry = (FactEntry) it.next() ) {
-	            InternalFactHandle handle = entry.getFactHandle();
-	            if ( this.constraints.isAllowedCachedLeft( handle ) ) {
-	                if ( this.unwrapRightObject ) {
-	                    // if there is a subnetwork, handle must be unwrapped
-	                    ReteTuple tuple = (ReteTuple) handle.getObject();
-	                    handle = tuple.getLastHandle();
-	                    this.timeWindow.accumulate( memory.workingMemoryContext,
-	                                                twContext,
-	                                                tuple,
-	                                                handle,
-	                                                workingMemory );
-	                } else {
-	                    this.timeWindow.accumulate( memory.workingMemoryContext,
-	                                                twContext,
-	                                                leftTuple,
-	                                                handle,
-	                                                workingMemory );
-	                }
-	            }
-	        }
+	        final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
 	        
-	        this.constraints.resetTuple();
-	        
-	        //timeWindowLogger.debug("timeWindow accumulated");
+	        TimeWindowResult twresult = new TimeWindowResult();
 	
-	        final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
-	                                                         twContext,
-	                                                         leftTuple,
-	                                                         workingMemory );
-	        
-	        if( result == null ) {
-	            throw new RuntimeDroolsException("TimeWindow must not return a null value.");
-	        }
+	        //InternalFactHandle lastHandle = leftTuple.getLastHandle();
+	    	boolean meetsWindowConstraints = true;
+	    	long eventEnd;// = 0;
+	    	
+	        if ( this.tupleMemoryEnabled ) {
+	        	      	
+	        	if (leftTuple.containsEvents()){
+	        	
+		            try {
+		    			eventEnd = leftTuple.getCreationTimeFirstContributingEvent();
+		    		} catch (ClassCastException e) {
+		    			System.err.println ("Unable to retrieve timestamp - Expected event:\n");
+		    			e.printStackTrace();
+		    			eventEnd = Long.MIN_VALUE;
+		    		}
+		            
+		    		// add tuple to time-ordered queue of tuples only if it meets window constraints
+		            if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
+		            	meetsWindowConstraints = false;
+		            // add negative event only if tuple contains an event and meets window constraints
+		            else this.expirationManager.addEventTupleExpiration(leftTuple, eventEnd+windowSize);
 	
-	        // First alpha node filters
-	        boolean isAllowed = true;
-	        final InternalFactHandle handle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
-	
-	        for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
-	            if ( !this.resultConstraints[i].isAllowed( handle,
-	                                                       workingMemory ) ) {
-	                isAllowed = false;
-	                break;
-	            }
+	        	}
+	        	
+	        	if (meetsWindowConstraints){
+	        		memory.betaMemory.getTupleMemory().add( leftTuple );
+	        		memory.betaMemory.getCreatedHandles().put( leftTuple,
+	                                            twresult,
+	                                            false );
+	        	}
 	        }
-	        if ( isAllowed ) {
-	            this.resultBinder.updateFromTuple( workingMemory,
-	                                               leftTuple );
-	            if ( this.resultBinder.isAllowedCachedLeft( handle ) ) {
-	                twresult.handle = handle;
 	
-	                //timeWindowLogger.debug("before propagation to sink");
-	                this.sink.propagateAssertTuple( leftTuple,
-	                                                handle,
-	                                                context,
-	                                                workingMemory );
-	                //timeWindowLogger.debug("after propagation to sink");
-	                
-	            } else {
-	                workingMemory.getFactHandleFactory().destroyFactHandle( handle );
-	            }
-	        } else {
-	            workingMemory.getFactHandleFactory().destroyFactHandle( handle );
-	        }
+	        if (meetsWindowConstraints){
+	        	
+	        	//myLogger.debug("meetswindowConstraints");
+	        	
+		        final Object twContext = this.timeWindow.createContext();
+		
+		        twresult.context = twContext;
+		        this.timeWindow.init( memory.workingMemoryContext,
+		                              twContext,
+		                              leftTuple,
+		                              workingMemory );
+		
+		        final Iterator it = memory.betaMemory.getFactHandleMemory().iterator( leftTuple );
+		        this.constraints.updateFromTuple( workingMemory,
+		                                          leftTuple );
+		
+		        for ( FactEntry entry = (FactEntry) it.next(); entry != null; entry = (FactEntry) it.next() ) {
+		            InternalFactHandle handle = entry.getFactHandle();
+		            if ( this.constraints.isAllowedCachedLeft( handle ) ) {
+		                if ( this.unwrapRightObject ) {
+		                    // if there is a subnetwork, handle must be unwrapped
+		                    ReteTuple tuple = (ReteTuple) handle.getObject();
+		                    handle = tuple.getLastHandle();
+		                    this.timeWindow.accumulate( memory.workingMemoryContext,
+		                                                twContext,
+		                                                tuple,
+		                                                handle,
+		                                                workingMemory );
+		                } else {
+		                    this.timeWindow.accumulate( memory.workingMemoryContext,
+		                                                twContext,
+		                                                leftTuple,
+		                                                handle,
+		                                                workingMemory );
+		                }
+		            }
+		        }
+		        
+		        this.constraints.resetTuple();
+		        
+		        //myLogger.debug("timeWindow accumulated");
+		
+		        final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
+		                                                         twContext,
+		                                                         leftTuple,
+		                                                         workingMemory );
+		        
+		        if( result == null ) {
+		            throw new RuntimeDroolsException("TimeWindow must not return a null value.");
+		        }
+		
+		        // First alpha node filters
+		        boolean isAllowed = true;
+		        final InternalFactHandle handle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
+		
+		        for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
+		            if ( !this.resultConstraints[i].isAllowed( handle,
+		                                                       workingMemory ) ) {
+		                isAllowed = false;
+		                break;
+		            }
+		        }
+		        if ( isAllowed ) {
+		            this.resultBinder.updateFromTuple( workingMemory,
+		                                               leftTuple );
+		            if ( this.resultBinder.isAllowedCachedLeft( handle ) ) {
+		                twresult.handle = handle;
+		
+		                //myLogger.debug("before propagation to sink");
+		                this.sink.propagateAssertTuple( leftTuple,
+		                                                handle,
+		                                                context,
+		                                                workingMemory );
+		                //myLogger.debug("after propagation to sink");
+		                
+		            } else {
+		                workingMemory.getFactHandleFactory().destroyFactHandle( handle );
+		            }
+		        } else {
+		            workingMemory.getFactHandleFactory().destroyFactHandle( handle );
+		        }
+	        
+	    	}
+        } finally {
+            lock.unlock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": LEAVING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());	
+        }
         
-    	}
-    /*} finally {
-    	lock.unlock(); timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    }*/
-        
     }
 
+   /* public void retractTuple(final ReteTuple leftTuple,
+            final PropagationContext context,
+            final InternalWorkingMemory workingMemory) {
+		// original method signature 
+		this.retractTuple(leftTuple, context, workingMemory, false);
+	}*/
+    
     /**
      * @inheritDoc
      * 
@@ -276,19 +273,25 @@
      */
     public void retractTuple(final ReteTuple leftTuple,
                              final PropagationContext context,
-                             final InternalWorkingMemory workingMemory) {
+                             final InternalWorkingMemory workingMemory/*,
+                             final boolean isInvokedByExpirationChecker*/) {
     	
-    	/*try{
+    	try{
     		
     		myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
     		lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    	*/
-    	myLogger.debug ("Retracting "+leftTuple.toString());
     	
     		final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
 	        memory.betaMemory.getTupleMemory().remove( leftTuple );
+	      
 	        final TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().remove( leftTuple );
 	
+	        // if accResult == null, probably it was deleted earlier when deleting the last belonging object
+	        if (accresult == null)
+	        {
+	        	throw new RuntimeException("THIS SHOULD NOT HAPPEN: The accResult tuple is null");
+	        }
+	        
 	        // if tuple was propagated
 	        if ( accresult.handle != null ) {
 	            this.sink.propagateRetractTuple( leftTuple,
@@ -300,23 +303,12 @@
 	            workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
 	        }
 	        
-       /* } finally {
+	        //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": Retracting successful: "+leftTuple.toString());
+	        
+        } finally {
         	lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-        }*/
+        }
     }
-    
-    /**
-     *  @inheritDoc
-     *  
-     *  If a bunch of objects is retracted, call modify tuple for each
-     *  tuple match.
-     */
-    public void retractEventTuples(Set<ReteTuple> tuples,
-                              //final PropagationContext context,
-                              final InternalWorkingMemory workingMemory) {
-    	for (ReteTuple tuple : tuples)
-    		retractTuple(tuple, DEFAULT_RETRACTION_CONTEXT, workingMemory);
-    }
 
     /**
      * @inheritDoc
@@ -326,87 +318,88 @@
      *  1. Select all matching tuples from left memory
      *  2. For each matching tuple, call a modify tuple
      *  
+     *  This method is is never invoked by the ExpirationChecker (part 
+     *  of the original Drools implementation) and therefore doesn't 
+     *  need an extra parameter indicating whether it was invoked 
+     *  from "outside" or not
      */
     public void assertObject(final InternalFactHandle handle,
                              final PropagationContext context,
                              final InternalWorkingMemory workingMemory) {
 
-    	/*try{
-    	
-    		myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    		lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    	*/
-    	
-    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
-    	
-		// update window bounds according to current session clock value
-    	long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
-		if (this.updateWindowBounds(currentTime))
-			this.updateNodeMemory(currentTime, workingMemory);
-        
-        boolean meetsWindowConstraints = true;
-        long eventEnd;
-        
-        if (handle.isEvent()){
-        	EventFactHandle evHandle;
-	        
-        	try {
-				evHandle = ((EventFactHandle)handle);
-				eventEnd = evHandle.getEndTimestamp();	
-        	} catch (ClassCastException e) {
-				System.err.println ("Unable to retrieve timestamp - Expected event:\n");
-				e.printStackTrace();
-				eventEnd = Long.MIN_VALUE;
-			}
-				// add event fact handle to time-ordered queue of events only if it meets window constraints
-		        if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
-		        	meetsWindowConstraints = false;
-		        // add negative event only if object is an event and meets window constraints
-		        else addExpiration(new EventExpiration<InternalFactHandle>(handle, false, eventEnd+windowSize));
-        }
-       
-        if (meetsWindowConstraints){
-        	
-        	memory.betaMemory.getFactHandleMemory().add( handle );
+    	try{
+    		myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": TRYING TO ENTER assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+    		lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ENTERING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
 
-	        if ( ! this.tupleMemoryEnabled ) {
-	            // do nothing here, as we know there are no left tuples at this stage in sequential mode.
-	            return;
+    		// update window bounds according to current session clock value
+    		this.updateWindowBounds(GlobalSessionClockImpl.getInstance().getCurrentTime());
+    		
+	        final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+	        
+	        boolean meetsWindowConstraints = true;
+	        long eventEnd;
+	        
+	        if (handle.isEvent()){
+	        	EventFactHandle evHandle;
+		        
+	        	try {
+					evHandle = ((EventFactHandle)handle);
+					eventEnd = evHandle.getEndTimestamp();	
+	        	} catch (ClassCastException e) {
+					System.err.println ("Unable to retrieve timestamp - Expected event:\n");
+					e.printStackTrace();
+					eventEnd = Long.MIN_VALUE;
+				}
+					// add event fact handle to time-ordered queue of events only if it meets window constraints
+			        if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
+			        	meetsWindowConstraints = false;
+			        // add negative event only if object is an event and meets window constraints
+			        else this.expirationManager.addEventObjectExpiration(handle, eventEnd+windowSize);
 	        }
+	       
+	        if (meetsWindowConstraints){
+	        	
+	        	memory.betaMemory.getFactHandleMemory().add( handle );
 	
-	        this.constraints.updateFromFactHandle( workingMemory,
-	                                               handle );
-	
-	        // need to clone the tuples to avoid concurrent modification exceptions
-	        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
-	        for ( int i = 0; i < tuples.length; i++ ) {
-	            ReteTuple tuple = (ReteTuple) tuples[i];
-	            if ( this.constraints.isAllowedCachedRight( tuple ) ) {
-	                if ( this.timeWindow.supportsReverse() || context.getType() == PropagationContext.ASSERTION ) {
-	                    modifyTuple( true,
-	                                 tuple,
-	                                 handle,
-	                                 context,
-	                                 workingMemory );
-	                } else {
-	                    // context is MODIFICATION and does not supports reverse
-	                    this.retractTuple( tuple,
-	                                       context,
-	                                       workingMemory );
-	                    this.assertTuple( tuple,
-	                                      context,
-	                                      workingMemory,
-	                                      true);
-	                }
-	            }
+		        if ( ! this.tupleMemoryEnabled ) {
+		            // do nothing here, as we know there are no left tuples at this stage in sequential mode.
+		            return;
+		        }
+		
+		        this.constraints.updateFromFactHandle( workingMemory,
+		                                               handle );
+		
+		        // need to clone the tuples to avoid concurrent modification exceptions
+		        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+		        for ( int i = 0; i < tuples.length; i++ ) {
+		            ReteTuple tuple = (ReteTuple) tuples[i];
+		            if ( this.constraints.isAllowedCachedRight( tuple ) ) {
+		                if ( this.timeWindow.supportsReverse() || context.getType() == PropagationContext.ASSERTION ) {
+		                    modifyTuple( true,
+		                                 tuple,
+		                                 handle,
+		                                 context,
+		                                 workingMemory );
+		                } else {
+		                    // context is MODIFICATION and does not supports reverse
+		                    this.retractTuple( tuple,
+		                                       context,
+		                                       workingMemory/*,
+		                                       false*/);
+		                    this.assertTuple( tuple,
+		                                      context,
+		                                      workingMemory/*,
+		                                      false*/);
+		                }
+		            }
+		        }
+		        
+		        this.constraints.resetFactHandle();
 	        }
-	        
-	        this.constraints.resetFactHandle();
-        }
         
-    	/*} finally {
-    		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    	}*/
+    	} finally {
+    		lock.unlock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": LEAVING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+    	}
         
     }
 
@@ -415,16 +408,21 @@
      *  
      *  If an object is retract, call modify tuple for each
      *  tuple match.
+     *  
+     *  This method is is never invoked by the ExpirationChecker (part 
+     *  of the original Drools implementation) and therefore doesn't 
+     *  need an extra parameter indicating whether it was invoked 
+     *  from "outside" or not
      */
     public void retractObject(final InternalFactHandle handle,
                               final PropagationContext context,
                               final InternalWorkingMemory workingMemory) {
     	
-    	/*try{
+    	try{
     		
 	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
 	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-		   */ 	
+		    	
 	    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
 	        if ( !memory.betaMemory.getFactHandleMemory().remove( handle ) ) {
 	            return;
@@ -446,282 +444,42 @@
 	                } else {
 	                    this.retractTuple( tuple,
 	                                       context,
-	                                       workingMemory );
+	                                       workingMemory/*,
+	                                       false*/);
 	                    this.assertTuple( tuple,
 	                                      context,
-	                                      workingMemory,
-	                                      true);
+	                                      workingMemory/*, 
+	                                      false*/);
 	                }
 	            }
 	        }
 	        
 	        this.constraints.resetFactHandle();
         
-    	/*} finally {
+    	} finally {
     		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
-    	}*/
+    	}
     }
-    
+
     /**
-     *  @inheritDoc
-     *  
-     *  If a bunch of objects is retracted, call modify tuple for each
-     *  tuple match.
+     * 
+     * This method is is never invoked by the ExpirationChecker (part 
+     *  of the original Drools implementation) and therefore doesn't 
+     *  need an extra parameter indicating whether it was invoked 
+     *  from "outside" or not
      */
-    public void retractEventObjects(Set<InternalFactHandle> handles,
-                              //final PropagationContext context,
-                              final InternalWorkingMemory workingMemory) {
-    	
-    	/*try{
-    		
-	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
-	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
-	    	*/
-	    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
-	       
-	    	if ( this.timeWindow.supportsReverse() ) {
-	    		
-	    		Map<ReteTuple,Set<InternalFactHandle>> tuplesMap = new HashMap<ReteTuple,Set<InternalFactHandle>>();
-	    		
-	    		for (InternalFactHandle currHandle : handles){
-	    	    	
-	    	    	if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
-	    	            return;
-	    	        }
-	    	        
-	    	        this.constraints.updateFromFactHandle( workingMemory,
-	    	                                               currHandle );
-	    	        
-	    	        // need to clone the tuples to avoid concurrent modification exceptions
-	    	        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
-	    	        for ( int i = 0; i < tuples.length; i++ ) {
-	    	            ReteTuple tuple = (ReteTuple) tuples[i];
-	    	            if ( this.constraints.isAllowedCachedRight( tuple ) ){
-	    	            	if (!tuplesMap.containsKey(tuple))
-	    	            		tuplesMap.put(tuple, new HashSet<InternalFactHandle>());
-	    	            	tuplesMap.get(tuple).add(currHandle);
-	    	            }
-	    	        }
-	    	        
-	    	        this.constraints.resetFactHandle();
-	        	
-	        	}
-	    		
-	    		for (ReteTuple tupleToModify : tuplesMap.keySet())
-	    			this.modifyEventTuple( false,
-	                    tupleToModify,
-	                    tuplesMap.get(tupleToModify),
-	                    DEFAULT_RETRACTION_CONTEXT, //context,
-	                    workingMemory );
-	    		
-	    	} else {
-	    		
-	    		Set<ReteTuple> tuplesSet = new HashSet<ReteTuple>();
-	    		
-	    		for (InternalFactHandle currHandle : handles){
-	    	    	
-	    	    	if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
-	    	            return;
-	    	        }
-	    	        
-	    	        this.constraints.updateFromFactHandle( workingMemory,
-	    	                                               currHandle );
-	    	        
-	    	        // need to clone the tuples to avoid concurrent modification exceptions
-	    	        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
-	    	        for ( int i = 0; i < tuples.length; i++ ) {
-	    	            ReteTuple tuple = (ReteTuple) tuples[i];
-	    	            if ( this.constraints.isAllowedCachedRight( tuple ) )
-	    	            	tuplesSet.add(tuple);
-	    	        }
-	    	        
-	    	        this.constraints.resetFactHandle();
-	        	
-	        	}
-	    		
-	    		for (ReteTuple tupleToModify : tuplesSet)
-	    		 {
-	                 this.retractTuple( tupleToModify,
-	                		 			DEFAULT_RETRACTION_CONTEXT, //context,
-	                                    workingMemory );
-	                 this.assertTuple( tupleToModify,
-	                		 		   DEFAULT_RETRACTION_CONTEXT, //context,
-	                                   workingMemory,
-	                                   true);
-	         }
-	    	
-	    	}
-    	
-    	/*} finally {
-    		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObjects(); currentHoldCount: "+lock.getHoldCount());
-    	}*/
-    	
-    }
-    
-    public void modifyEventTuple(final boolean isAssert,
-            final ReteTuple leftTuple,
-            Set<InternalFactHandle> handles,
-            final PropagationContext context,
-            final InternalWorkingMemory workingMemory) {
-
-    	/*try{
-    		
-	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
-	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
-    	*/
-			final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
-			TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
-			
-			//myLogger.debug("accresult initialized");
-			
-			// if tuple was propagated
-			if ( accresult.handle != null ) {
-				this.sink.propagateRetractTuple( leftTuple,
-				                             accresult.handle,
-				                             context,
-				                             workingMemory );
-				
-				// Destroying the acumulate result object 
-				workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
-				accresult.handle = null;
-			}
-			
-			//myLogger.debug("accresult.handle != null and its retraction propagated");
-			
-			if ( context.getType() == PropagationContext.ASSERTION && accresult.context == null ) {
-					final Object accContext = this.timeWindow.createContext();
-					
-					this.timeWindow.init( memory.workingMemoryContext,
-					                      accContext,
-					                      leftTuple,
-					                      workingMemory );
-					
-					accresult.context = accContext;
-				}
-			
-			//myLogger.debug("if accContext == ASSERTION new accContext would have been created and timewindow would hve been initialzed");
-			
-			for (InternalFactHandle handle : handles){
-			
-				ReteTuple tuple = leftTuple;
-				if ( this.unwrapRightObject ) {
-					// if there is a subnetwork, handle must be unwrapped
-					myLogger.debug ("SUBNETWORK_multiple");
-					tuple = (ReteTuple) handle.getObject();
-					handle = tuple.getLastHandle();
-				}
-				
-				if ( context.getType() == PropagationContext.ASSERTION ) {
-					/*
-					// assertion
-					if ( accresult.context == null ) {
-						final Object accContext = this.timeWindow.createContext();
-						
-						this.timeWindow.init( memory.workingMemoryContext,
-						                      accContext,
-						                      leftTuple,
-						                      workingMemory );
-						
-						accresult.context = accContext;
-					}*/
-	
-					myLogger.debug ("ASSERTION_multiple");
-					this.timeWindow.accumulate( memory.workingMemoryContext,
-					                        accresult.context,
-					                        tuple,
-					                        handle,
-					                        workingMemory );
-				} else if ( context.getType() == PropagationContext.MODIFICATION || 
-				context.getType() == PropagationContext.RULE_ADDITION || 
-				context.getType() == PropagationContext.RULE_REMOVAL ) {
-					// modification
-					myLogger.debug ("MODIFICATION_multiple");
-					if ( isAssert ) {
-						this.timeWindow.accumulate( memory.workingMemoryContext,
-						                            accresult.context,
-						                            tuple,
-						                            handle,
-						                            workingMemory );
-					} else {
-						this.timeWindow.reverse( memory.workingMemoryContext,
-						                         accresult.context,
-						                         tuple,
-						                         handle,
-						                         workingMemory );
-					}
-				} else {
-					// retraction
-					myLogger.debug ("RETRACTION_multiple");
-					this.timeWindow.reverse( memory.workingMemoryContext,
-					                     accresult.context,
-					                     tuple,
-					                     handle,
-					                     workingMemory );
-				}
-			
-			}
-			
-			//myLogger.debug("modification finished and ready to retrieve the accResult");
-			
-			final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
-			                                         accresult.context,
-			                                         leftTuple,
-			                                         workingMemory );
-			
-			if( result == null ) {
-				throw new RuntimeDroolsException("TimeWindow must not return a null value.");
-			}
-			
-			// First alpha node filters
-			boolean isAllowed = true;
-			final InternalFactHandle createdHandle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
-			for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
-				if ( !this.resultConstraints[i].isAllowed( createdHandle,
-				                                       workingMemory ) ) {
-					isAllowed = false;
-					break;
-				}
-			}
-			//myLogger.debug("alpha node filtering done");
-			if ( isAllowed ) {
-				this.resultBinder.updateFromTuple( workingMemory,
-				                               leftTuple );
-				if ( this.resultBinder.isAllowedCachedLeft( createdHandle ) ) {
-					accresult.handle = createdHandle;
-					
-					//myLogger.debug("before propagating left tuple");
-					this.sink.propagateAssertTuple( leftTuple,
-					                                createdHandle,
-					                                context,
-					                                workingMemory );
-					//myLogger.debug("after propagating left tuple");
-				} else {
-					workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
-				}
-			
-				this.resultBinder.resetTuple();
-			} else {
-				workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
-			}
-			//myLogger.debug("resultbinder udated and propagated");
-		
-    	/*} finally {
-    		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
-    	}*/
-	} 
-
     public void modifyTuple(final boolean isAssert,
                             final ReteTuple leftTuple,
                             InternalFactHandle handle,
                             final PropagationContext context,
                             final InternalWorkingMemory workingMemory) {
 
-    	/*try{
+    	try{
     		
 	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
 	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
 
-*/	        final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+	        final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
 	        TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
 	
 	        // if tuple was propagated
@@ -739,14 +497,14 @@
 	        ReteTuple tuple = leftTuple;
 	        if ( this.unwrapRightObject ) {
 	            // if there is a subnetwork, handle must be unwrapped
-	        	myLogger.debug ("SUBNETWORK");
+	        	//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": SUBNETWORK");
 	            tuple = (ReteTuple) handle.getObject();
 	            handle = tuple.getLastHandle();
 	        }
 	
 	        if ( context.getType() == PropagationContext.ASSERTION ) {
 	            // assertion
-	        	myLogger.debug ("ASSERTION");
+	        	//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ASSERTION");
 	            if ( accresult.context == null ) {
 	                final Object accContext = this.timeWindow.createContext();
 	
@@ -767,7 +525,7 @@
 	                context.getType() == PropagationContext.RULE_ADDITION || 
 	                context.getType() == PropagationContext.RULE_REMOVAL ) {
 	            // modification
-	        	myLogger.debug ("MODIFICATION");
+	        	//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": MODIFICATION");
 	            if ( isAssert ) {
 	                this.timeWindow.accumulate( memory.workingMemoryContext,
 	                                            accresult.context,
@@ -783,7 +541,7 @@
 	            }
 	        } else {
 	            // retraction
-	        	myLogger.debug ("RETRACTION");
+	        	//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": RETRACTION");
 	            this.timeWindow.reverse( memory.workingMemoryContext,
 	                                     accresult.context,
 	                                     tuple,
@@ -829,20 +587,21 @@
 	            workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
 	        }
         
-    	/*} finally {
+    	} finally {
     		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
-    	}*/
+    	}
     }
 
     public void updateSink(final TupleSink sink,
                            final PropagationContext context,
                            final InternalWorkingMemory workingMemory) {
 
-    	/*try{
+    	myLogger.info ("Thread "+Thread.currentThread().getId() +": UPDATING SINK!!!");
+    	try{
     		
 	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER updateSink(); currentHoldCount: "+lock.getHoldCount());
 	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING updateSink(); currentHoldCount: "+lock.getHoldCount());
-    	*/
+    	
 	    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
 	
 	        final Iterator it = memory.betaMemory.getCreatedHandles().iterator();
@@ -855,9 +614,9 @@
 	                              workingMemory );
 	        }
         
-    	/*} finally {
+    	} finally {
     		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING updateSink(); currentHoldCount: "+lock.getHoldCount());
-    	}*/
+    	}
     }
 
     /* (non-Javadoc)
@@ -902,70 +661,324 @@
     	memory.betaMemory = this.constraints.createBetaMemory( config );
         //memory.betaMemory = new BetaMemory( new OrderedTupleHashTable(), new OrderedFactHashTable() );
         memory.workingMemoryContext = this.timeWindow.createWorkingMemoryContext();
-        //memory.orderedEventFactList = new ConcurrentLinkedQueue<EventFactHandle>();
-        //memory.orderedEventTupleList = new ConcurrentLinkedQueue<ReteTuple>();
         return memory;
     }
 
     // returns true if window bounds had to be updated, false otherwise
     private synchronized boolean updateWindowBounds (long currentTime){
     	if (windowEnd < currentTime){
-    		myLogger.debug ("Window bounds updated");
     		windowEnd = currentTime;
     		windowStart = (windowEnd<windowSize)? 0 : (windowEnd-windowSize+1);
 			return true;
 		}
 		return false;
     }
-       
-    public synchronized void updateNodeMemory ( final long currentTime, final InternalWorkingMemory workingMemory){
-    	
-    	// remove facts being outside window bounds from items to be considered for aggregation
     
-	    
-    	EventExpiration<?> temp = orderedExpirationList.peek();
-		if (!orderedExpirationList.isEmpty() && temp.getExpires() <= currentTime){
+    /**
+     *  @inheritDoc
+     *  
+     *  If a bunch of objects is retracted, call modify tuple for each
+     *  tuple match. This method is is only invoked by the ExpirationChecker 
+     *  (not part of the original Drools implementation) and therefore 
+     *  doesn't need an extra parameter indicating whether it was invoked 
+     *  from "outside" or not
+     */
+    private void retractEventTuples(Set<ReteTuple> tuples,
+                              //final PropagationContext context,
+                              final InternalWorkingMemory workingMemory) {
+    	for (ReteTuple tuple : tuples)
+	    	retractTuple(tuple, DEFAULT_RETRACTION_CONTEXT, workingMemory/*, true*/);
+    }
+    
+    /**
+     * 
+     * This method is is only invoked by the ExpirationChecker (not
+     * part of the original Drools implementation) and therefore 
+     * doesn't need an extra parameter indicating whether it was 
+     * invoked from "outside" or not
+     */
+    private void modifyEventTuple(/*final boolean isAssert,*/
+            final ReteTuple leftTuple,
+            final Set<InternalFactHandle> handles,
+            final PropagationContext context,
+            final InternalWorkingMemory workingMemory) {
+
+    	/*try{
+    		
+	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+    	*/
+    	//myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+    	
+			final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+			TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
 			
-			do {
-				if (temp.isOfTypeTuple())
-					eventTuplesToRetract.add((ReteTuple)temp.getEvent());
-				else eventObjectsToRetract.add((InternalFactHandle)temp.getEvent());
-				// remove expired event from queue
-				orderedExpirationList.poll();
-				// get head of queue = next expiration
-				temp = orderedExpirationList.peek();
-			} while (!orderedExpirationList.isEmpty() && temp.getExpires() <= currentTime);
-		
-			// update memory    
-			if (!eventObjectsToRetract.isEmpty()){
-				retractEventObjects(eventObjectsToRetract, workingMemory);
-				eventObjectsToRetract.clear();
+			//myLogger.debug("accresult initialized");
+			
+			// if tuple was propagated
+			if ( accresult.handle != null ) {
+				this.sink.propagateRetractTuple( leftTuple,
+				                             accresult.handle,
+				                             context,
+				                             workingMemory );
+				
+				// Destroying the acumulate result object 
+				workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
+				accresult.handle = null;
 			}
 			
-			if (!eventTuplesToRetract.isEmpty()){
-				retractEventTuples(eventTuplesToRetract, workingMemory);
-				eventTuplesToRetract.clear();
+			//myLogger.debug("accresult.handle != null and its retraction propagated");
+			
+			if ( context.getType() == PropagationContext.ASSERTION && accresult.context == null ) {
+					final Object accContext = this.timeWindow.createContext();
+					
+					this.timeWindow.init( memory.workingMemoryContext,
+					                      accContext,
+					                      leftTuple,
+					                      workingMemory );
+					
+					accresult.context = accContext;
+				}
+			
+			//myLogger.debug("if accContext == ASSERTION new accContext would have been created and timewindow would hve been initialzed");
+			
+			for (InternalFactHandle handle : handles){
+			
+				ReteTuple tuple = leftTuple;
+				if ( this.unwrapRightObject ) {
+					// if there is a subnetwork, handle must be unwrapped
+					//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": SUBNETWORK_multiple");
+					tuple = (ReteTuple) handle.getObject();
+					handle = tuple.getLastHandle();
+				}
+				
+				if ( context.getType() == PropagationContext.ASSERTION ) {
+					/*
+					// assertion
+					if ( accresult.context == null ) {
+						final Object accContext = this.timeWindow.createContext();
+						
+						this.timeWindow.init( memory.workingMemoryContext,
+						                      accContext,
+						                      leftTuple,
+						                      workingMemory );
+						
+						accresult.context = accContext;
+					}*/
+	
+					//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ASSERTION_multiple");
+					this.timeWindow.accumulate( memory.workingMemoryContext,
+					                        accresult.context,
+					                        tuple,
+					                        handle,
+					                        workingMemory );
+				} else if ( context.getType() == PropagationContext.MODIFICATION || 
+				context.getType() == PropagationContext.RULE_ADDITION || 
+				context.getType() == PropagationContext.RULE_REMOVAL ) {
+					// modification
+					//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": MODIFICATION_multiple");
+					/*if ( isAssert ) {
+						this.timeWindow.accumulate( memory.workingMemoryContext,
+						                            accresult.context,
+						                            tuple,
+						                            handle,
+						                            workingMemory );
+					} else {*/
+						this.timeWindow.reverse( memory.workingMemoryContext,
+						                         accresult.context,
+						                         tuple,
+						                         handle,
+						                         workingMemory );
+					//}
+				} else {
+					// retraction
+					//myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": RETRACTION_multiple");
+					this.timeWindow.reverse( memory.workingMemoryContext,
+					                     accresult.context,
+					                     tuple,
+					                     handle,
+					                     workingMemory );
+				}
+			
 			}
-		}
+			
+			//myLogger.debug("modification finished and ready to retrieve the accResult");
+			
+			final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
+			                                         accresult.context,
+			                                         leftTuple,
+			                                         workingMemory );
+			
+			if( result == null ) {
+				throw new RuntimeDroolsException("TimeWindow must not return a null value.");
+			}
+			
+			// First alpha node filters
+			boolean isAllowed = true;
+			final InternalFactHandle createdHandle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
+			for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
+				if ( !this.resultConstraints[i].isAllowed( createdHandle,
+				                                       workingMemory ) ) {
+					isAllowed = false;
+					break;
+				}
+			}
+			//myLogger.debug("alpha node filtering done");
+			if ( isAllowed ) {
+				this.resultBinder.updateFromTuple( workingMemory,
+				                               leftTuple );
+				if ( this.resultBinder.isAllowedCachedLeft( createdHandle ) ) {
+					accresult.handle = createdHandle;
+					
+					//myLogger.debug("before propagating left tuple");
+					this.sink.propagateAssertTuple( leftTuple,
+					                                createdHandle,
+					                                context,
+					                                workingMemory );
+					//myLogger.debug("after propagating left tuple "+leftTuple.toString()+": accresult == "+accresult.toString() +", accresult.handle == "+accresult.handle);
+				} else {
+					workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
+				}
+			
+				this.resultBinder.resetTuple();
+			} else {
+				workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
+			}
+			
+			//myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
 		
-	}
-
-    public void addExpiration (EventExpiration<?> expiration){ // see remarks NodeUpdate
-		//NodeUpdate reminder = new NodeUpdate(expirationTime, node);
-		int index = Collections.binarySearch(orderedExpirationList, expiration, evexComparator);
-		/*if (index < 0)
-			orderedExpirationList.add(-index-1, expiration);
-			*/
-		orderedExpirationList.add(Math.abs(-index-1), expiration);
-	}
+    	/*} finally {
+    		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+    	}*/
+	} 
     
+    /**
+     *  @inheritDoc
+     *  
+     *  If a bunch of objects is retracted, call modify tuple for each
+     *  tuple match. This method is is only invoked by the ExpirationChecker 
+     *  (not part of the original Drools implementation) and therefore 
+     *  doesn't need an extra parameter indicating whether it was invoked 
+     *  from "outside" or not
+     */
+    private void retractEventObjects(Set<InternalFactHandle> handles,
+                              //final PropagationContext context,
+                              final InternalWorkingMemory workingMemory) {
+    	
+    	/*try{
+    		
+	    	myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
+	    	lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
+	    	*/
+	    	final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+	       
+	    	if ( this.timeWindow.supportsReverse() ) {
+	    		
+	    		Map<ReteTuple,Set<InternalFactHandle>> tuplesMap = new HashMap<ReteTuple,Set<InternalFactHandle>>();
+	    		
+	    		for (InternalFactHandle currHandle : handles){
+	    	    	
+	    	    	if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
+	    	            return;
+	    	        }
+	    	        
+	    	        this.constraints.updateFromFactHandle( workingMemory,
+	    	                                               currHandle );
+	    	        
+	    	        // need to clone the tuples to avoid concurrent modification exceptions
+	    	        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+	    	        for ( int i = 0; i < tuples.length; i++ ) {
+	    	            ReteTuple tuple = (ReteTuple) tuples[i];
+	    	            if ( this.constraints.isAllowedCachedRight( tuple ) ){
+	    	            	if (!tuplesMap.containsKey(tuple))
+	    	            		tuplesMap.put(tuple, new HashSet<InternalFactHandle>());
+	    	            	tuplesMap.get(tuple).add(currHandle);
+	    	            }
+	    	        }
+	    	        
+	    	        this.constraints.resetFactHandle();
+	        	
+	        	}
+	    		
+	    		for (ReteTuple tupleToModify : tuplesMap.keySet())
+	    			this.modifyEventTuple( /*false,*/
+	                    tupleToModify,
+	                    tuplesMap.get(tupleToModify),
+	                    DEFAULT_RETRACTION_CONTEXT, //context,
+	                    workingMemory );
+	    		
+	    	} else {
+	    		
+	    		Set<ReteTuple> tuplesSet = new HashSet<ReteTuple>();
+	    		
+	    		for (InternalFactHandle currHandle : handles){
+	    	    	
+	    	    	if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
+	    	            return;
+	    	        }
+	    	        
+	    	        this.constraints.updateFromFactHandle( workingMemory,
+	    	                                               currHandle );
+	    	        
+	    	        // need to clone the tuples to avoid concurrent modification exceptions
+	    	        Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+	    	        for ( int i = 0; i < tuples.length; i++ ) {
+	    	            ReteTuple tuple = (ReteTuple) tuples[i];
+	    	            if ( this.constraints.isAllowedCachedRight( tuple ) )
+	    	            	tuplesSet.add(tuple);
+	    	        }
+	    	        
+	    	        this.constraints.resetFactHandle();
+	        	
+	        	}
+	    		
+	    		for (ReteTuple tupleToModify : tuplesSet)
+	    		 {
+	                 this.retractTuple( tupleToModify,
+	                		 			DEFAULT_RETRACTION_CONTEXT,
+	                                    workingMemory/*,
+	                                    true*/);
+	                 this.assertTuple( tupleToModify,
+	                		 		   DEFAULT_RETRACTION_CONTEXT, 
+	                                   workingMemory/*,
+	                                   true*/);
+	         }
+	    	
+	    	}
+    	
+    	/*} finally {
+    		lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObjects(); currentHoldCount: "+lock.getHoldCount());
+    	}*/
+    	
+    }
+    
+    /**
+     *  @inheritDoc
+     *  
+     *  Delete all expired objects and tuples. While doing so, lock methods
+     *  such as assert.... and retract... for "original" Rete. This method
+     *  is only invoked by the ExpirationChecker (not part of the original 
+     *  Drools implementation) and therefore doesn't need an extra parameter 
+     *  indicating whether it was invoked from "outside" or not.
+     */
+    public void retractExpiredEvents(final Set<InternalFactHandle> handles,
+    								 final Set<ReteTuple> tuples,
+    								 final InternalWorkingMemory workingMemory) {
+    	try {
+    		lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId() +": Start update");
+    		this.retractEventObjects(handles, workingMemory);
+    		this.retractEventTuples(tuples, workingMemory);
+    	} finally {
+	    	lock.unlock();
+    	}
+    	
+    }
+    
     public static class TimeWindowMemory {
         private static final long serialVersionUID = 400L;
         
         public Object workingMemoryContext;
         public BetaMemory betaMemory;
-        //public Queue<EventFactHandle> orderedEventFactList;
-        //public Queue<ReteTuple> orderedEventTupleList;
     }
 
     private static class TimeWindowResult {
@@ -974,19 +987,4 @@
         public Object             context;
     }
     
-    private class EventExpirationComparator  
-	implements Comparator<EventExpiration<?>>{
-
-		@Override
-		public int compare(EventExpiration<?> evex1, EventExpiration<?> evex2) {
-			long t1 = evex1.getExpires(); 
-			long t2 = evex2.getExpires();
-			if (t1 == t2) 
-				return 0;
-			else if (t1 < t2)
-				return -1;
-			else return 1;
-		}
-	
-	}
 }

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -1,26 +1,28 @@
 package org.drools.temporal;
 
-import org.drools.spi.Tuple;
+import org.drools.reteoo.TimeWindowNode;
 
 public class EventExpiration<T> {
 	
 	private final T event;
-	private final long expires; 
+	private final TimeWindowNode node;
+	//private final EventExpirationPoint evExPoint;
+	//private final long expires; 
 	//private PropagationContext context;
-	private final boolean ofTypeTuple;
+	//private final boolean ofTypeTuple;
+
+	/*public EventExpiration(final T event, final TimeWindowNode node, final long expires, PropagationContext context) {
+		this (event, node, expires, (event instanceof Tuple));
+	}*/
 	
-	public EventExpiration(final T event, final boolean isTuple, final long expires/*, PropagationContext context*/) {
+	public EventExpiration(final T event/*, final boolean isTuple*/, final TimeWindowNode node/*, final long expires, PropagationContext context, EventExpirationPoint evExPoint*/) {
 		this.event = event;
-		this.ofTypeTuple = isTuple;
-		this.expires = expires;
+		//this.ofTypeTuple = isTuple;
+		this.node = node;
+		//this.expires = expires;
 		//this.context = context;
-		
-		//System.out.println (this.toString());
+		//this.evExPoint = evExPoint;
 	}
-
-	public EventExpiration(final T event, final long expires/*, PropagationContext context*/) {
-		this(event, (event instanceof Tuple), expires);
-	}
 	
 	/**
 	 * @return the tuple
@@ -38,20 +40,34 @@
 
 	/**
 	 * @return the expires
-	 */
+	 *//*
 	public long getExpires() {
 		return expires;
-	}
+	}*/
 
 	/**
 	 * @return Returns true if item is of type Tuple, false if it is of type EventFactHandle
-	 */
+	 *//*
 	public boolean isOfTypeTuple() {
 		return ofTypeTuple;
 	}
-	
-	public String toString(){
-		return event.toString()+ " expires at "+AbstractSessionClock.millisecsToString(expires);
+
+	/**
+	 * @return the node
+	 */
+	public TimeWindowNode getNode() {
+		return node;
 	}
 
+	/**
+	 * @return the evExPoint
+	 *//*
+	public EventExpirationPoint getEventExpirationPoint() {
+		return evExPoint;
+	}*/
+	
+	/*public String toString(){
+		return event.toString()+ " at node "+node.toString()+ " expires at "+AbstractObservableSessionClock.millisecsToString(expires);
+	}*/
+
 }

Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java	                        (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,61 @@
+package org.drools.temporal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.reteoo.ReteTuple;
+
+public class EventExpirationPoint {
+
+	private final long expires;
+	private Set<InternalFactHandle> eventObjectsToRetract;
+	private Set<ReteTuple> eventTuplesToRetract;
+	
+	public EventExpirationPoint(final long expires) {
+		this.expires = expires;
+		this.eventObjectsToRetract = new HashSet<InternalFactHandle>();
+		this.eventTuplesToRetract = new HashSet<ReteTuple>();
+		
+	}
+	
+/*	public EventExpirationPoint(EventExpiration<?> expiration) {
+		this(expiration.getExpires());
+		this.addEventExpiration(expiration);
+	}*/
+	
+	/*public void addEventExpiration (EventExpiration<?> expiration){
+		if (expiration.isOfTypeTuple())
+			this.addEventTupleExpiration((EventExpiration<ReteTuple>) expiration);
+		else this.addEventObjectExpiration((EventExpiration<InternalFactHandle>)expiration);
+	}*/
+	
+	public void addEventObjectExpiration (InternalFactHandle expiration){
+		eventObjectsToRetract.add(expiration);
+	}
+	
+	public void addEventTupleExpiration (ReteTuple expiration){
+		eventTuplesToRetract.add(expiration);
+	}
+
+	/**
+	 * @return the expires
+	 */
+	public long getExpires() {
+		return expires;
+	}
+
+	/**
+	 * @return the eventObjectsToRetract
+	 */
+	public Set<InternalFactHandle> getEventObjectsToRetract() {
+		return eventObjectsToRetract;
+	}
+
+	/**
+	 * @return the eventTuplesToRetract
+	 */
+	public Set<ReteTuple> getEventTuplesToRetract() {
+		return eventTuplesToRetract;
+	}
+}

Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java	                        (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,95 @@
+package org.drools.temporal;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.drools.common.InternalWorkingMemory;
+
+public final class ExpirationChecker extends Thread{
+	
+	private final Logger myLogger = Logger.getLogger(ExpirationChecker.class.getName());
+	
+	private final InternalWorkingMemory workingMemory;
+	private LinkedList<TimeWindowNodeExpirationManager> orderedExpirationList;
+	private Long nextExpiration;
+
+    /** Singleton pattern: private class attribute, create single instance of the class. */
+    private static ExpirationChecker INSTANCE;
+ 
+    /** Singleton pattern: private constructor, no instantiation from outside */ 
+    private ExpirationChecker(final InternalWorkingMemory wm) {
+    	//set ExpirationsCheckers priority to max so that expirations are detected asap
+    	this.setPriority(Thread.MAX_PRIORITY);
+    	this.workingMemory = wm;
+    	//this.supervisedNodes = Collections.synchronizedSet(new HashSet<TimeWindowNodeExpirationManager>());
+    	//this.expirationMap = Collections.synchronizedMap(new HashMap<TimeWindowNodeExpirationManager, NodeUpdate>());
+		this.orderedExpirationList = new LinkedList<TimeWindowNodeExpirationManager>();
+		this.nextExpiration = null;
+    	// start thread (run-method)
+		this.start();
+    }
+
+    /** Singleton pattern: static method "createInstance()" creates instance with parameter, but only once. */
+    public static synchronized ExpirationChecker createInstance(final InternalWorkingMemory wm) {
+        if (INSTANCE == null) {
+            INSTANCE = new ExpirationChecker(wm);
+        } else {
+            throw new UnsupportedOperationException("Instance already created!");
+        }
+        return INSTANCE;
+    }
+    
+    /** Singleton pattern: static method "getInstance()" returns single instance of the class. */
+    public static synchronized ExpirationChecker getInstance() {
+        if (INSTANCE != null) {
+            return INSTANCE;
+        } else {
+            throw new UnsupportedOperationException("Create instance with working memory as parameter first!");
+        }
+    }
+    
+ 	// updates next expiration for given time window node
+    public void updateNextNodeExpiration(final TimeWindowNodeExpirationManager node){
+    	// (temporarily) remove node from ordered list (since nextExpiration is not up to date anymore) if present
+    	this.orderedExpirationList.remove(node);
+    	// if new expiration != null, then find new position and insert node into ordered list (according to next expiration)
+    	if (node.getNextExpiration() != null){
+	    	int index = Collections.binarySearch(this.orderedExpirationList, node, new NodeUpdateComparator());
+			if (index < 0)
+				this.orderedExpirationList.add(-index-1, node);
+    	}
+    	// set next expiration to expiration of first node in the ordered list (or null if empty)
+    	this.nextExpiration = (this.orderedExpirationList.isEmpty())?null:this.orderedExpirationList.peek().getNextExpiration();
+    	myLogger.debug ("ExpirationChecker, Thread "+Thread.currentThread().getId()+": changed next global expiration: "+((this.nextExpiration==null)?"undefined":AbstractSessionClock.millisecsToString(this.nextExpiration)));
+    }
+
+	@Override
+	public void run(){
+		
+		while (true){
+			//SessionClock clock = GlobalSessionClockImpl.getInstance();
+			long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
+			//myLogger.debug ("\nExpirationChecker, Thread "+Thread.currentThread().getId()+": currentTime: "+AbstractSessionClock.millisecsToString(currentTime));
+			
+			if (this.nextExpiration != null && this.nextExpiration <= currentTime)
+				this.orderedExpirationList.peek().determineExpiredEvents(currentTime, workingMemory);
+			
+			// pause thread to allow other threads to execute (such as the event sender or the original Rete)
+			Thread.yield();
+		}
+		
+	}
+	
+	private class NodeUpdateComparator  
+		implements Comparator<TimeWindowNodeExpirationManager>{
+
+		@Override
+		public int compare(TimeWindowNodeExpirationManager em1, TimeWindowNodeExpirationManager em2) {
+			return (int)(em1.getNextExpiration()-em2.getNextExpiration());
+		}
+
+	}
+	
+}

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -6,7 +6,7 @@
 
 	/** Singleton pattern: private class attribute, create single instance of the class. */
     private static SessionClock INSTANCE;
-
+ 
     /** Singleton pattern: private constructor, no instantiation from outside */ 
     private GlobalSessionClockImpl() {
     }

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -28,9 +28,9 @@
  */
 public class SessionPseudoClock extends
     AbstractSessionClock {
-
+    
 	private final Logger myLogger = Logger.getLogger(SessionPseudoClock.class.getName());
-  
+	
     private long timer;
 
     public SessionPseudoClock() {
@@ -68,4 +68,4 @@
     }
    
 
-}
\ No newline at end of file
+}

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright 2007 JBoss Inc
  * 
@@ -69,4 +68,4 @@
 			}
     }
 
-}
+}

Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java	                        (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,95 @@
+package org.drools.temporal;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.reteoo.ReteTuple;
+import org.drools.reteoo.TimeWindowNode;
+
+public class TimeWindowNodeExpirationManager {
+	
+	private final Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
+
+	private final TimeWindowNode parent;
+	private final ConcurrentSkipListMap<Long, EventExpirationPoint> orderedExpirationMap;
+	private Set<InternalFactHandle> eventObjectsToRetract;
+	private Set<ReteTuple> eventTuplesToRetract;
+	private Long nextExpiration;
+	
+	public TimeWindowNodeExpirationManager(final TimeWindowNode parent) {
+		this.parent = parent;
+		this.orderedExpirationMap = new ConcurrentSkipListMap<Long, EventExpirationPoint>();
+		this.eventObjectsToRetract = /*Collections.synchronizedSet(*/new HashSet<InternalFactHandle>()/*)*/;
+		this.eventTuplesToRetract = /*Collections.synchronizedSet(*/new HashSet<ReteTuple>()/*)*/;
+		this.nextExpiration = null;
+
+	}
+	
+	private EventExpirationPoint getEventExpirationPoint (long expirationTime){
+		// check whether expiration point exists already for specified time; if so, get it - if not, create one
+		EventExpirationPoint evExPoint;
+		if (this.orderedExpirationMap.containsKey(expirationTime))
+			evExPoint = this.orderedExpirationMap.get(expirationTime);
+			else {
+				evExPoint = new EventExpirationPoint(expirationTime);
+				this.orderedExpirationMap.put(expirationTime, evExPoint);
+				// check whether new expiration is earliest; if so, update nextExpiration
+				if (nextExpiration == null || expirationTime < nextExpiration)
+					this.setNextExpiration(expirationTime);
+			}
+		myLogger.debug("TimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": number of expiration points: "+this.orderedExpirationMap.keySet().size()+", next local expiration: "+((this.nextExpiration==null)?"undefined":AbstractSessionClock.millisecsToString(this.nextExpiration)));
+		return evExPoint;
+	}
+	
+	public void addEventTupleExpiration (ReteTuple tuple, long expirationTime){ // see remarks NodeUpdate
+		myLogger.debug ("\nTimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": Added expiration "+AbstractSessionClock.millisecsToString(expirationTime)+" for tuple\n"+tuple.toString());
+		this.getEventExpirationPoint(expirationTime).addEventTupleExpiration(tuple);
+	}
+	
+	public void addEventObjectExpiration (InternalFactHandle handle, long expirationTime){ // see remarks NodeUpdate
+		myLogger.debug ("\nTimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": Added expiration "+AbstractSessionClock.millisecsToString(expirationTime)+" for handle\n"+handle.toString());
+		this.getEventExpirationPoint(expirationTime).addEventObjectExpiration(handle);
+	}
+
+	public void determineExpiredEvents (long currentTime, final InternalWorkingMemory workingMemory){
+    	Map.Entry<Long, EventExpirationPoint> nextExpirationPoint;
+		// traverse through all expired entries            		
+		do {
+			// remove expired expiration point from queue
+			nextExpirationPoint = orderedExpirationMap.pollFirstEntry();
+			// send expired event objects to corresponding time window node
+			this.eventObjectsToRetract.addAll(nextExpirationPoint.getValue().getEventObjectsToRetract());
+			// send expired event tuples to corresponding time window node
+			this.eventTuplesToRetract.addAll(nextExpirationPoint.getValue().getEventTuplesToRetract());
+			// get head of queue = next expiration point
+			nextExpirationPoint = orderedExpirationMap.firstEntry();
+		} while (nextExpirationPoint != null && nextExpirationPoint.getKey() <= currentTime);
+		        		       		
+		// retract the expired events in the corresponding timeWindowNode
+		parent.retractExpiredEvents(this.eventObjectsToRetract, this.eventTuplesToRetract, workingMemory);
+		// delete the lists of expired events
+		this.eventObjectsToRetract.clear();
+		this.eventTuplesToRetract.clear();
+
+		// set new node expiration and notify ExpirationChecker
+		this.setNextExpiration((nextExpirationPoint==null)?null:nextExpirationPoint.getKey());
+	}
+
+	public Long getNextExpiration() {
+		return nextExpiration;
+	}
+
+	// set next expiration to new value and notify ExpirationChecker
+	public synchronized void setNextExpiration(Long nextExpiration) {
+		this.nextExpiration = nextExpiration;
+		ExpirationChecker.getInstance().updateNextNodeExpiration(this);
+	}
+	
+}

Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java	2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java	2008-04-21 13:00:10 UTC (rev 19668)
@@ -6,7 +6,6 @@
 
 import org.drools.common.EventFactHandle;
 import org.drools.common.InternalFactHandle;
-import org.drools.spi.PropagationContext;
 
 public class OrderedFactHashTable extends FactHashTable {
 




More information about the jboss-svn-commits mailing list