[jboss-svn-commits] JBL Code SVN: r19668 - in labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools: reteoo and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Apr 21 09:00:11 EDT 2008
Author: mgroch
Date: 2008-04-21 09:00:10 -0400 (Mon, 21 Apr 2008)
New Revision: 19668
Added:
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java
Modified:
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java
labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java
Log:
modifications from ITA to NTA
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/PropagationContextImpl.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 8400185220119865618L;
- private int type;
-
+ private final int type;
+
private Rule rule;
private Activation activation;
@@ -168,10 +168,10 @@
public void setEntryPoint(EntryPoint entryPoint) {
this.entryPoint = entryPoint;
}
-
+
/**
* @param clone the object and set new type
- */
+ *//*
public PropagationContext clone(final int myType) {
return new PropagationContextImpl(propagationNumber,
myType,
@@ -180,9 +180,10 @@
activeActivations,
dormantActivations,
entryPoint);
- }
+ }*/
- public void setType (int type){
+ /* public void setType (int type){
this.type = type;
- }
+ }*/
+
}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/common/SingleThreadedObjectStore.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -25,9 +25,12 @@
private AssertBehaviour behaviour;
private Lock lock;
+ private final Logger myLogger = Logger.getLogger(SingleThreadedObjectStore.class.getName());
+
public SingleThreadedObjectStore(RuleBaseConfiguration conf, Lock lock) {
this.behaviour = conf.getAssertBehaviour();
- this.lock = lock;
+ //this.lock = lock;
+ this.lock = new ReentrantLock();
this.assertMap = new ObjectHashMap();
@@ -59,8 +62,6 @@
* @see org.drools.common.ObjectStore#getObjectForHandle(org.drools.common.InternalFactHandle)
*/
public Object getObjectForHandle(InternalFactHandle handle) {
- //Logger myLogger = Logger.getLogger(SingleThreadedObjectStore.class.getName());
- //this.lock = new ReentrantLock();
try {
//myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER getObjectForHandle("+handle.getObject().toString()+")");
this.lock.lock();
@@ -80,8 +81,9 @@
return object;
} finally {
+ //myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO LEAVE getObjectForHandle("+handle.getObject().toString()+")");
this.lock.unlock();
- //myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING getObjectForHandle("+handle.getObject().toString()+")");
+ //myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEFT getObjectForHandle("+handle.getObject().toString()+")");
}
}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooRuleBase.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -48,6 +48,7 @@
import org.drools.spi.ExecutorServiceFactory;
import org.drools.spi.FactHandleFactory;
import org.drools.spi.PropagationContext;
+import org.drools.temporal.ExpirationChecker;
import org.drools.temporal.GlobalSessionClockImpl;
/**
@@ -293,6 +294,9 @@
true,
null,
null ) );
+ // hand wm to expiration checker and start loop
+ if (clockType != null)
+ ExpirationChecker.createInstance(session);
}
return session;
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/ReteooTemporalSession.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -17,9 +17,13 @@
*/
package org.drools.reteoo;
+import java.util.Observer;
+import java.util.Set;
+
import org.drools.TemporalSession;
import org.drools.common.InternalRuleBase;
import org.drools.concurrent.ExecutorService;
+import org.drools.temporal.AbstractSessionClock;
import org.drools.temporal.SessionClock;
/**
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/reteoo/TimeWindowNode.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -17,13 +17,11 @@
package org.drools.reteoo;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.drools.RuleBaseConfiguration;
@@ -37,9 +35,8 @@
import org.drools.rule.TimeWindow;
import org.drools.spi.AlphaNodeFieldConstraint;
import org.drools.spi.PropagationContext;
-import org.drools.temporal.AbstractSessionClock;
-import org.drools.temporal.EventExpiration;
import org.drools.temporal.GlobalSessionClockImpl;
+import org.drools.temporal.TimeWindowNodeExpirationManager;
import org.drools.util.ArrayUtils;
import org.drools.util.Entry;
import org.drools.util.FactEntry;
@@ -52,10 +49,9 @@
*
* @author mgroch
*/
-public class TimeWindowNode extends BetaNode
- /*implements Observer*/{
+public class TimeWindowNode extends BetaNode{
- Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
+ private final Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
private static final long serialVersionUID = 400L;
@@ -65,12 +61,11 @@
private final BetaConstraints resultBinder;
private final long windowSize;
private long windowStart, windowEnd;
+ private final ReentrantLock lock;
+ private final TimeWindowNodeExpirationManager expirationManager;
private final PropagationContext DEFAULT_RETRACTION_CONTEXT = new PropagationContextImpl(0, PropagationContext.RETRACTION, null, null);;
- private final LinkedList<EventExpiration<?>> orderedExpirationList;
- private Set<InternalFactHandle> eventObjectsToRetract;
- private Set<ReteTuple> eventTuplesToRetract;
- private final EventExpirationComparator evexComparator;
+
public TimeWindowNode(final int id,
final TupleSource leftInput,
final ObjectSource rightInput,
@@ -92,20 +87,16 @@
this.windowSize = timeWindow.getWindowSize();
this.windowStart = 0;
this.windowEnd = 0;
- this.orderedExpirationList = new LinkedList<EventExpiration<?>>();
- this.evexComparator = new EventExpirationComparator();
- this.eventObjectsToRetract = new HashSet<InternalFactHandle>();
- this.eventTuplesToRetract = new HashSet<ReteTuple>();
-
+ this.expirationManager = new TimeWindowNodeExpirationManager(this);
+ this.lock = new ReentrantLock();
}
-
- // original method signature like it is invoked by original Rete when inserting a new event into wm (no modification)
- public void assertTuple(final ReteTuple leftTuple,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory){
-
- this.assertTuple(leftTuple, context, workingMemory, false);
- }
+
+/* public void assertTuple(final ReteTuple leftTuple,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+ // original method signature
+ this.assertTuple(leftTuple, context, workingMemory, false);
+ }*/
/**
* @inheritDoc
@@ -128,145 +119,151 @@
*/
public void assertTuple(final ReteTuple leftTuple,
final PropagationContext context,
- final InternalWorkingMemory workingMemory,
- final boolean isPartOfUpdate) {
+ final InternalWorkingMemory workingMemory/*,
+ final boolean isInvokedByExpirationChecker*/) {
- /*try {
-
- timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- lock.lock(); timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- */
-
- final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+ try {
+
+ myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": TRYING TO ENTER assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+ lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ENTERING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- // update window bounds according to current session clock value
- long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
- if (!isPartOfUpdate && this.updateWindowBounds(currentTime))
- this.updateNodeMemory(currentTime, workingMemory);
+ // update window bounds according to current session clock value
+ this.updateWindowBounds(GlobalSessionClockImpl.getInstance().getCurrentTime());
+ // remove facts and tuples being outside window bounds from items to be considered for aggregation
+ //updateNodeMemory(memory, context, workingMemory);
- TimeWindowResult twresult = new TimeWindowResult();
-
- //InternalFactHandle lastHandle = leftTuple.getLastHandle();
- boolean meetsWindowConstraints = true;
- long eventEnd;// = 0;
-
- if ( this.tupleMemoryEnabled ) {
-
- if (leftTuple.containsEvents()){
-
- try {
- eventEnd = leftTuple.getCreationTimeFirstContributingEvent();
- } catch (ClassCastException e) {
- System.err.println ("Unable to retrieve timestamp - Expected event:\n");
- e.printStackTrace();
- eventEnd = Long.MIN_VALUE;
- }
-
- // add tuple to time-ordered queue of tuples only if it meets window constraints
- if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
- meetsWindowConstraints = false;
- // add negative event only if tuple contains an event and meets window constraints
- else addExpiration(new EventExpiration<ReteTuple>(leftTuple, true, eventEnd+windowSize));
-
- }
-
- if (meetsWindowConstraints){
- memory.betaMemory.getTupleMemory().add( leftTuple );
- memory.betaMemory.getCreatedHandles().put( leftTuple,
- twresult,
- false );
- }
- }
-
- if (meetsWindowConstraints){
-
- //timeWindowLogger.debug("meetswindowConstraints");
-
- final Object twContext = this.timeWindow.createContext();
-
- twresult.context = twContext;
- this.timeWindow.init( memory.workingMemoryContext,
- twContext,
- leftTuple,
- workingMemory );
-
- final Iterator it = memory.betaMemory.getFactHandleMemory().iterator( leftTuple );
- this.constraints.updateFromTuple( workingMemory,
- leftTuple );
-
- for ( FactEntry entry = (FactEntry) it.next(); entry != null; entry = (FactEntry) it.next() ) {
- InternalFactHandle handle = entry.getFactHandle();
- if ( this.constraints.isAllowedCachedLeft( handle ) ) {
- if ( this.unwrapRightObject ) {
- // if there is a subnetwork, handle must be unwrapped
- ReteTuple tuple = (ReteTuple) handle.getObject();
- handle = tuple.getLastHandle();
- this.timeWindow.accumulate( memory.workingMemoryContext,
- twContext,
- tuple,
- handle,
- workingMemory );
- } else {
- this.timeWindow.accumulate( memory.workingMemoryContext,
- twContext,
- leftTuple,
- handle,
- workingMemory );
- }
- }
- }
+ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
- this.constraints.resetTuple();
-
- //timeWindowLogger.debug("timeWindow accumulated");
+ TimeWindowResult twresult = new TimeWindowResult();
- final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
- twContext,
- leftTuple,
- workingMemory );
-
- if( result == null ) {
- throw new RuntimeDroolsException("TimeWindow must not return a null value.");
- }
+ //InternalFactHandle lastHandle = leftTuple.getLastHandle();
+ boolean meetsWindowConstraints = true;
+ long eventEnd;// = 0;
+
+ if ( this.tupleMemoryEnabled ) {
+
+ if (leftTuple.containsEvents()){
+
+ try {
+ eventEnd = leftTuple.getCreationTimeFirstContributingEvent();
+ } catch (ClassCastException e) {
+ System.err.println ("Unable to retrieve timestamp - Expected event:\n");
+ e.printStackTrace();
+ eventEnd = Long.MIN_VALUE;
+ }
+
+ // add tuple to time-ordered queue of tuples only if it meets window constraints
+ if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
+ meetsWindowConstraints = false;
+ // add negative event only if tuple contains an event and meets window constraints
+ else this.expirationManager.addEventTupleExpiration(leftTuple, eventEnd+windowSize);
- // First alpha node filters
- boolean isAllowed = true;
- final InternalFactHandle handle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
-
- for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
- if ( !this.resultConstraints[i].isAllowed( handle,
- workingMemory ) ) {
- isAllowed = false;
- break;
- }
+ }
+
+ if (meetsWindowConstraints){
+ memory.betaMemory.getTupleMemory().add( leftTuple );
+ memory.betaMemory.getCreatedHandles().put( leftTuple,
+ twresult,
+ false );
+ }
}
- if ( isAllowed ) {
- this.resultBinder.updateFromTuple( workingMemory,
- leftTuple );
- if ( this.resultBinder.isAllowedCachedLeft( handle ) ) {
- twresult.handle = handle;
- //timeWindowLogger.debug("before propagation to sink");
- this.sink.propagateAssertTuple( leftTuple,
- handle,
- context,
- workingMemory );
- //timeWindowLogger.debug("after propagation to sink");
-
- } else {
- workingMemory.getFactHandleFactory().destroyFactHandle( handle );
- }
- } else {
- workingMemory.getFactHandleFactory().destroyFactHandle( handle );
- }
+ if (meetsWindowConstraints){
+
+ //myLogger.debug("meetswindowConstraints");
+
+ final Object twContext = this.timeWindow.createContext();
+
+ twresult.context = twContext;
+ this.timeWindow.init( memory.workingMemoryContext,
+ twContext,
+ leftTuple,
+ workingMemory );
+
+ final Iterator it = memory.betaMemory.getFactHandleMemory().iterator( leftTuple );
+ this.constraints.updateFromTuple( workingMemory,
+ leftTuple );
+
+ for ( FactEntry entry = (FactEntry) it.next(); entry != null; entry = (FactEntry) it.next() ) {
+ InternalFactHandle handle = entry.getFactHandle();
+ if ( this.constraints.isAllowedCachedLeft( handle ) ) {
+ if ( this.unwrapRightObject ) {
+ // if there is a subnetwork, handle must be unwrapped
+ ReteTuple tuple = (ReteTuple) handle.getObject();
+ handle = tuple.getLastHandle();
+ this.timeWindow.accumulate( memory.workingMemoryContext,
+ twContext,
+ tuple,
+ handle,
+ workingMemory );
+ } else {
+ this.timeWindow.accumulate( memory.workingMemoryContext,
+ twContext,
+ leftTuple,
+ handle,
+ workingMemory );
+ }
+ }
+ }
+
+ this.constraints.resetTuple();
+
+ //myLogger.debug("timeWindow accumulated");
+
+ final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
+ twContext,
+ leftTuple,
+ workingMemory );
+
+ if( result == null ) {
+ throw new RuntimeDroolsException("TimeWindow must not return a null value.");
+ }
+
+ // First alpha node filters
+ boolean isAllowed = true;
+ final InternalFactHandle handle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
+
+ for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
+ if ( !this.resultConstraints[i].isAllowed( handle,
+ workingMemory ) ) {
+ isAllowed = false;
+ break;
+ }
+ }
+ if ( isAllowed ) {
+ this.resultBinder.updateFromTuple( workingMemory,
+ leftTuple );
+ if ( this.resultBinder.isAllowedCachedLeft( handle ) ) {
+ twresult.handle = handle;
+
+ //myLogger.debug("before propagation to sink");
+ this.sink.propagateAssertTuple( leftTuple,
+ handle,
+ context,
+ workingMemory );
+ //myLogger.debug("after propagation to sink");
+
+ } else {
+ workingMemory.getFactHandleFactory().destroyFactHandle( handle );
+ }
+ } else {
+ workingMemory.getFactHandleFactory().destroyFactHandle( handle );
+ }
+
+ }
+ } finally {
+ lock.unlock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": LEAVING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+ }
- }
- /*} finally {
- lock.unlock(); timeWindowLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING assertTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- }*/
-
}
+ /* public void retractTuple(final ReteTuple leftTuple,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+ // original method signature
+ this.retractTuple(leftTuple, context, workingMemory, false);
+ }*/
+
/**
* @inheritDoc
*
@@ -276,19 +273,25 @@
*/
public void retractTuple(final ReteTuple leftTuple,
final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
+ final InternalWorkingMemory workingMemory/*,
+ final boolean isInvokedByExpirationChecker*/) {
- /*try{
+ try{
myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- */
- myLogger.debug ("Retracting "+leftTuple.toString());
final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
memory.betaMemory.getTupleMemory().remove( leftTuple );
+
final TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().remove( leftTuple );
+ // if accResult == null, probably it was deleted earlier when deleting the last belonging object
+ if (accresult == null)
+ {
+ throw new RuntimeException("THIS SHOULD NOT HAPPEN: The accResult tuple is null");
+ }
+
// if tuple was propagated
if ( accresult.handle != null ) {
this.sink.propagateRetractTuple( leftTuple,
@@ -300,23 +303,12 @@
workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
}
- /* } finally {
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": Retracting successful: "+leftTuple.toString());
+
+ } finally {
lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractTuple("+leftTuple.getLastHandle().getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- }*/
+ }
}
-
- /**
- * @inheritDoc
- *
- * If a bunch of objects is retracted, call modify tuple for each
- * tuple match.
- */
- public void retractEventTuples(Set<ReteTuple> tuples,
- //final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
- for (ReteTuple tuple : tuples)
- retractTuple(tuple, DEFAULT_RETRACTION_CONTEXT, workingMemory);
- }
/**
* @inheritDoc
@@ -326,87 +318,88 @@
* 1. Select all matching tuples from left memory
* 2. For each matching tuple, call a modify tuple
*
+ * This method is is never invoked by the ExpirationChecker (part
+ * of the original Drools implementation) and therefore doesn't
+ * need an extra parameter indicating whether it was invoked
+ * from "outside" or not
*/
public void assertObject(final InternalFactHandle handle,
final PropagationContext context,
final InternalWorkingMemory workingMemory) {
- /*try{
-
- myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- */
-
- final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
-
- // update window bounds according to current session clock value
- long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
- if (this.updateWindowBounds(currentTime))
- this.updateNodeMemory(currentTime, workingMemory);
-
- boolean meetsWindowConstraints = true;
- long eventEnd;
-
- if (handle.isEvent()){
- EventFactHandle evHandle;
-
- try {
- evHandle = ((EventFactHandle)handle);
- eventEnd = evHandle.getEndTimestamp();
- } catch (ClassCastException e) {
- System.err.println ("Unable to retrieve timestamp - Expected event:\n");
- e.printStackTrace();
- eventEnd = Long.MIN_VALUE;
- }
- // add event fact handle to time-ordered queue of events only if it meets window constraints
- if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
- meetsWindowConstraints = false;
- // add negative event only if object is an event and meets window constraints
- else addExpiration(new EventExpiration<InternalFactHandle>(handle, false, eventEnd+windowSize));
- }
-
- if (meetsWindowConstraints){
-
- memory.betaMemory.getFactHandleMemory().add( handle );
+ try{
+ myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": TRYING TO ENTER assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+ lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ENTERING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- if ( ! this.tupleMemoryEnabled ) {
- // do nothing here, as we know there are no left tuples at this stage in sequential mode.
- return;
+ // update window bounds according to current session clock value
+ this.updateWindowBounds(GlobalSessionClockImpl.getInstance().getCurrentTime());
+
+ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+
+ boolean meetsWindowConstraints = true;
+ long eventEnd;
+
+ if (handle.isEvent()){
+ EventFactHandle evHandle;
+
+ try {
+ evHandle = ((EventFactHandle)handle);
+ eventEnd = evHandle.getEndTimestamp();
+ } catch (ClassCastException e) {
+ System.err.println ("Unable to retrieve timestamp - Expected event:\n");
+ e.printStackTrace();
+ eventEnd = Long.MIN_VALUE;
+ }
+ // add event fact handle to time-ordered queue of events only if it meets window constraints
+ if (!(windowStart <= eventEnd && eventEnd <= windowEnd))
+ meetsWindowConstraints = false;
+ // add negative event only if object is an event and meets window constraints
+ else this.expirationManager.addEventObjectExpiration(handle, eventEnd+windowSize);
}
+
+ if (meetsWindowConstraints){
+
+ memory.betaMemory.getFactHandleMemory().add( handle );
- this.constraints.updateFromFactHandle( workingMemory,
- handle );
-
- // need to clone the tuples to avoid concurrent modification exceptions
- Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
- for ( int i = 0; i < tuples.length; i++ ) {
- ReteTuple tuple = (ReteTuple) tuples[i];
- if ( this.constraints.isAllowedCachedRight( tuple ) ) {
- if ( this.timeWindow.supportsReverse() || context.getType() == PropagationContext.ASSERTION ) {
- modifyTuple( true,
- tuple,
- handle,
- context,
- workingMemory );
- } else {
- // context is MODIFICATION and does not supports reverse
- this.retractTuple( tuple,
- context,
- workingMemory );
- this.assertTuple( tuple,
- context,
- workingMemory,
- true);
- }
- }
+ if ( ! this.tupleMemoryEnabled ) {
+ // do nothing here, as we know there are no left tuples at this stage in sequential mode.
+ return;
+ }
+
+ this.constraints.updateFromFactHandle( workingMemory,
+ handle );
+
+ // need to clone the tuples to avoid concurrent modification exceptions
+ Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+ for ( int i = 0; i < tuples.length; i++ ) {
+ ReteTuple tuple = (ReteTuple) tuples[i];
+ if ( this.constraints.isAllowedCachedRight( tuple ) ) {
+ if ( this.timeWindow.supportsReverse() || context.getType() == PropagationContext.ASSERTION ) {
+ modifyTuple( true,
+ tuple,
+ handle,
+ context,
+ workingMemory );
+ } else {
+ // context is MODIFICATION and does not supports reverse
+ this.retractTuple( tuple,
+ context,
+ workingMemory/*,
+ false*/);
+ this.assertTuple( tuple,
+ context,
+ workingMemory/*,
+ false*/);
+ }
+ }
+ }
+
+ this.constraints.resetFactHandle();
}
-
- this.constraints.resetFactHandle();
- }
- /*} finally {
- lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- }*/
+ } finally {
+ lock.unlock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": LEAVING assertObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
+ }
}
@@ -415,16 +408,21 @@
*
* If an object is retract, call modify tuple for each
* tuple match.
+ *
+ * This method is is never invoked by the ExpirationChecker (part
+ * of the original Drools implementation) and therefore doesn't
+ * need an extra parameter indicating whether it was invoked
+ * from "outside" or not
*/
public void retractObject(final InternalFactHandle handle,
final PropagationContext context,
final InternalWorkingMemory workingMemory) {
- /*try{
+ try{
myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- */
+
final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
if ( !memory.betaMemory.getFactHandleMemory().remove( handle ) ) {
return;
@@ -446,282 +444,42 @@
} else {
this.retractTuple( tuple,
context,
- workingMemory );
+ workingMemory/*,
+ false*/);
this.assertTuple( tuple,
context,
- workingMemory,
- true);
+ workingMemory/*,
+ false*/);
}
}
}
this.constraints.resetFactHandle();
- /*} finally {
+ } finally {
lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObject("+handle.getObject().toString()+"); currentHoldCount: "+lock.getHoldCount());
- }*/
+ }
}
-
+
/**
- * @inheritDoc
- *
- * If a bunch of objects is retracted, call modify tuple for each
- * tuple match.
+ *
+ * This method is is never invoked by the ExpirationChecker (part
+ * of the original Drools implementation) and therefore doesn't
+ * need an extra parameter indicating whether it was invoked
+ * from "outside" or not
*/
- public void retractEventObjects(Set<InternalFactHandle> handles,
- //final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
-
- /*try{
-
- myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
- lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
- */
- final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
-
- if ( this.timeWindow.supportsReverse() ) {
-
- Map<ReteTuple,Set<InternalFactHandle>> tuplesMap = new HashMap<ReteTuple,Set<InternalFactHandle>>();
-
- for (InternalFactHandle currHandle : handles){
-
- if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
- return;
- }
-
- this.constraints.updateFromFactHandle( workingMemory,
- currHandle );
-
- // need to clone the tuples to avoid concurrent modification exceptions
- Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
- for ( int i = 0; i < tuples.length; i++ ) {
- ReteTuple tuple = (ReteTuple) tuples[i];
- if ( this.constraints.isAllowedCachedRight( tuple ) ){
- if (!tuplesMap.containsKey(tuple))
- tuplesMap.put(tuple, new HashSet<InternalFactHandle>());
- tuplesMap.get(tuple).add(currHandle);
- }
- }
-
- this.constraints.resetFactHandle();
-
- }
-
- for (ReteTuple tupleToModify : tuplesMap.keySet())
- this.modifyEventTuple( false,
- tupleToModify,
- tuplesMap.get(tupleToModify),
- DEFAULT_RETRACTION_CONTEXT, //context,
- workingMemory );
-
- } else {
-
- Set<ReteTuple> tuplesSet = new HashSet<ReteTuple>();
-
- for (InternalFactHandle currHandle : handles){
-
- if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
- return;
- }
-
- this.constraints.updateFromFactHandle( workingMemory,
- currHandle );
-
- // need to clone the tuples to avoid concurrent modification exceptions
- Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
- for ( int i = 0; i < tuples.length; i++ ) {
- ReteTuple tuple = (ReteTuple) tuples[i];
- if ( this.constraints.isAllowedCachedRight( tuple ) )
- tuplesSet.add(tuple);
- }
-
- this.constraints.resetFactHandle();
-
- }
-
- for (ReteTuple tupleToModify : tuplesSet)
- {
- this.retractTuple( tupleToModify,
- DEFAULT_RETRACTION_CONTEXT, //context,
- workingMemory );
- this.assertTuple( tupleToModify,
- DEFAULT_RETRACTION_CONTEXT, //context,
- workingMemory,
- true);
- }
-
- }
-
- /*} finally {
- lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObjects(); currentHoldCount: "+lock.getHoldCount());
- }*/
-
- }
-
- public void modifyEventTuple(final boolean isAssert,
- final ReteTuple leftTuple,
- Set<InternalFactHandle> handles,
- final PropagationContext context,
- final InternalWorkingMemory workingMemory) {
-
- /*try{
-
- myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
- lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
- */
- final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
- TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
-
- //myLogger.debug("accresult initialized");
-
- // if tuple was propagated
- if ( accresult.handle != null ) {
- this.sink.propagateRetractTuple( leftTuple,
- accresult.handle,
- context,
- workingMemory );
-
- // Destroying the acumulate result object
- workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
- accresult.handle = null;
- }
-
- //myLogger.debug("accresult.handle != null and its retraction propagated");
-
- if ( context.getType() == PropagationContext.ASSERTION && accresult.context == null ) {
- final Object accContext = this.timeWindow.createContext();
-
- this.timeWindow.init( memory.workingMemoryContext,
- accContext,
- leftTuple,
- workingMemory );
-
- accresult.context = accContext;
- }
-
- //myLogger.debug("if accContext == ASSERTION new accContext would have been created and timewindow would hve been initialzed");
-
- for (InternalFactHandle handle : handles){
-
- ReteTuple tuple = leftTuple;
- if ( this.unwrapRightObject ) {
- // if there is a subnetwork, handle must be unwrapped
- myLogger.debug ("SUBNETWORK_multiple");
- tuple = (ReteTuple) handle.getObject();
- handle = tuple.getLastHandle();
- }
-
- if ( context.getType() == PropagationContext.ASSERTION ) {
- /*
- // assertion
- if ( accresult.context == null ) {
- final Object accContext = this.timeWindow.createContext();
-
- this.timeWindow.init( memory.workingMemoryContext,
- accContext,
- leftTuple,
- workingMemory );
-
- accresult.context = accContext;
- }*/
-
- myLogger.debug ("ASSERTION_multiple");
- this.timeWindow.accumulate( memory.workingMemoryContext,
- accresult.context,
- tuple,
- handle,
- workingMemory );
- } else if ( context.getType() == PropagationContext.MODIFICATION ||
- context.getType() == PropagationContext.RULE_ADDITION ||
- context.getType() == PropagationContext.RULE_REMOVAL ) {
- // modification
- myLogger.debug ("MODIFICATION_multiple");
- if ( isAssert ) {
- this.timeWindow.accumulate( memory.workingMemoryContext,
- accresult.context,
- tuple,
- handle,
- workingMemory );
- } else {
- this.timeWindow.reverse( memory.workingMemoryContext,
- accresult.context,
- tuple,
- handle,
- workingMemory );
- }
- } else {
- // retraction
- myLogger.debug ("RETRACTION_multiple");
- this.timeWindow.reverse( memory.workingMemoryContext,
- accresult.context,
- tuple,
- handle,
- workingMemory );
- }
-
- }
-
- //myLogger.debug("modification finished and ready to retrieve the accResult");
-
- final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
- accresult.context,
- leftTuple,
- workingMemory );
-
- if( result == null ) {
- throw new RuntimeDroolsException("TimeWindow must not return a null value.");
- }
-
- // First alpha node filters
- boolean isAllowed = true;
- final InternalFactHandle createdHandle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
- for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
- if ( !this.resultConstraints[i].isAllowed( createdHandle,
- workingMemory ) ) {
- isAllowed = false;
- break;
- }
- }
- //myLogger.debug("alpha node filtering done");
- if ( isAllowed ) {
- this.resultBinder.updateFromTuple( workingMemory,
- leftTuple );
- if ( this.resultBinder.isAllowedCachedLeft( createdHandle ) ) {
- accresult.handle = createdHandle;
-
- //myLogger.debug("before propagating left tuple");
- this.sink.propagateAssertTuple( leftTuple,
- createdHandle,
- context,
- workingMemory );
- //myLogger.debug("after propagating left tuple");
- } else {
- workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
- }
-
- this.resultBinder.resetTuple();
- } else {
- workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
- }
- //myLogger.debug("resultbinder udated and propagated");
-
- /*} finally {
- lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
- }*/
- }
-
public void modifyTuple(final boolean isAssert,
final ReteTuple leftTuple,
InternalFactHandle handle,
final PropagationContext context,
final InternalWorkingMemory workingMemory) {
- /*try{
+ try{
myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
-*/ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
// if tuple was propagated
@@ -739,14 +497,14 @@
ReteTuple tuple = leftTuple;
if ( this.unwrapRightObject ) {
// if there is a subnetwork, handle must be unwrapped
- myLogger.debug ("SUBNETWORK");
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": SUBNETWORK");
tuple = (ReteTuple) handle.getObject();
handle = tuple.getLastHandle();
}
if ( context.getType() == PropagationContext.ASSERTION ) {
// assertion
- myLogger.debug ("ASSERTION");
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ASSERTION");
if ( accresult.context == null ) {
final Object accContext = this.timeWindow.createContext();
@@ -767,7 +525,7 @@
context.getType() == PropagationContext.RULE_ADDITION ||
context.getType() == PropagationContext.RULE_REMOVAL ) {
// modification
- myLogger.debug ("MODIFICATION");
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": MODIFICATION");
if ( isAssert ) {
this.timeWindow.accumulate( memory.workingMemoryContext,
accresult.context,
@@ -783,7 +541,7 @@
}
} else {
// retraction
- myLogger.debug ("RETRACTION");
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": RETRACTION");
this.timeWindow.reverse( memory.workingMemoryContext,
accresult.context,
tuple,
@@ -829,20 +587,21 @@
workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
}
- /*} finally {
+ } finally {
lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_singleHandle(); currentHoldCount: "+lock.getHoldCount());
- }*/
+ }
}
public void updateSink(final TupleSink sink,
final PropagationContext context,
final InternalWorkingMemory workingMemory) {
- /*try{
+ myLogger.info ("Thread "+Thread.currentThread().getId() +": UPDATING SINK!!!");
+ try{
myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER updateSink(); currentHoldCount: "+lock.getHoldCount());
lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING updateSink(); currentHoldCount: "+lock.getHoldCount());
- */
+
final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
final Iterator it = memory.betaMemory.getCreatedHandles().iterator();
@@ -855,9 +614,9 @@
workingMemory );
}
- /*} finally {
+ } finally {
lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING updateSink(); currentHoldCount: "+lock.getHoldCount());
- }*/
+ }
}
/* (non-Javadoc)
@@ -902,70 +661,324 @@
memory.betaMemory = this.constraints.createBetaMemory( config );
//memory.betaMemory = new BetaMemory( new OrderedTupleHashTable(), new OrderedFactHashTable() );
memory.workingMemoryContext = this.timeWindow.createWorkingMemoryContext();
- //memory.orderedEventFactList = new ConcurrentLinkedQueue<EventFactHandle>();
- //memory.orderedEventTupleList = new ConcurrentLinkedQueue<ReteTuple>();
return memory;
}
// returns true if window bounds had to be updated, false otherwise
private synchronized boolean updateWindowBounds (long currentTime){
if (windowEnd < currentTime){
- myLogger.debug ("Window bounds updated");
windowEnd = currentTime;
windowStart = (windowEnd<windowSize)? 0 : (windowEnd-windowSize+1);
return true;
}
return false;
}
-
- public synchronized void updateNodeMemory ( final long currentTime, final InternalWorkingMemory workingMemory){
-
- // remove facts being outside window bounds from items to be considered for aggregation
-
- EventExpiration<?> temp = orderedExpirationList.peek();
- if (!orderedExpirationList.isEmpty() && temp.getExpires() <= currentTime){
+ /**
+ * @inheritDoc
+ *
+ * If a bunch of objects is retracted, call modify tuple for each
+ * tuple match. This method is is only invoked by the ExpirationChecker
+ * (not part of the original Drools implementation) and therefore
+ * doesn't need an extra parameter indicating whether it was invoked
+ * from "outside" or not
+ */
+ private void retractEventTuples(Set<ReteTuple> tuples,
+ //final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+ for (ReteTuple tuple : tuples)
+ retractTuple(tuple, DEFAULT_RETRACTION_CONTEXT, workingMemory/*, true*/);
+ }
+
+ /**
+ *
+ * This method is is only invoked by the ExpirationChecker (not
+ * part of the original Drools implementation) and therefore
+ * doesn't need an extra parameter indicating whether it was
+ * invoked from "outside" or not
+ */
+ private void modifyEventTuple(/*final boolean isAssert,*/
+ final ReteTuple leftTuple,
+ final Set<InternalFactHandle> handles,
+ final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+
+ /*try{
+
+ myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+ lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+ */
+ //myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+
+ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+ TimeWindowResult accresult = (TimeWindowResult) memory.betaMemory.getCreatedHandles().get( leftTuple );
- do {
- if (temp.isOfTypeTuple())
- eventTuplesToRetract.add((ReteTuple)temp.getEvent());
- else eventObjectsToRetract.add((InternalFactHandle)temp.getEvent());
- // remove expired event from queue
- orderedExpirationList.poll();
- // get head of queue = next expiration
- temp = orderedExpirationList.peek();
- } while (!orderedExpirationList.isEmpty() && temp.getExpires() <= currentTime);
-
- // update memory
- if (!eventObjectsToRetract.isEmpty()){
- retractEventObjects(eventObjectsToRetract, workingMemory);
- eventObjectsToRetract.clear();
+ //myLogger.debug("accresult initialized");
+
+ // if tuple was propagated
+ if ( accresult.handle != null ) {
+ this.sink.propagateRetractTuple( leftTuple,
+ accresult.handle,
+ context,
+ workingMemory );
+
+ // Destroying the acumulate result object
+ workingMemory.getFactHandleFactory().destroyFactHandle( accresult.handle );
+ accresult.handle = null;
}
- if (!eventTuplesToRetract.isEmpty()){
- retractEventTuples(eventTuplesToRetract, workingMemory);
- eventTuplesToRetract.clear();
+ //myLogger.debug("accresult.handle != null and its retraction propagated");
+
+ if ( context.getType() == PropagationContext.ASSERTION && accresult.context == null ) {
+ final Object accContext = this.timeWindow.createContext();
+
+ this.timeWindow.init( memory.workingMemoryContext,
+ accContext,
+ leftTuple,
+ workingMemory );
+
+ accresult.context = accContext;
+ }
+
+ //myLogger.debug("if accContext == ASSERTION new accContext would have been created and timewindow would hve been initialzed");
+
+ for (InternalFactHandle handle : handles){
+
+ ReteTuple tuple = leftTuple;
+ if ( this.unwrapRightObject ) {
+ // if there is a subnetwork, handle must be unwrapped
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": SUBNETWORK_multiple");
+ tuple = (ReteTuple) handle.getObject();
+ handle = tuple.getLastHandle();
+ }
+
+ if ( context.getType() == PropagationContext.ASSERTION ) {
+ /*
+ // assertion
+ if ( accresult.context == null ) {
+ final Object accContext = this.timeWindow.createContext();
+
+ this.timeWindow.init( memory.workingMemoryContext,
+ accContext,
+ leftTuple,
+ workingMemory );
+
+ accresult.context = accContext;
+ }*/
+
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": ASSERTION_multiple");
+ this.timeWindow.accumulate( memory.workingMemoryContext,
+ accresult.context,
+ tuple,
+ handle,
+ workingMemory );
+ } else if ( context.getType() == PropagationContext.MODIFICATION ||
+ context.getType() == PropagationContext.RULE_ADDITION ||
+ context.getType() == PropagationContext.RULE_REMOVAL ) {
+ // modification
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": MODIFICATION_multiple");
+ /*if ( isAssert ) {
+ this.timeWindow.accumulate( memory.workingMemoryContext,
+ accresult.context,
+ tuple,
+ handle,
+ workingMemory );
+ } else {*/
+ this.timeWindow.reverse( memory.workingMemoryContext,
+ accresult.context,
+ tuple,
+ handle,
+ workingMemory );
+ //}
+ } else {
+ // retraction
+ //myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId()+": RETRACTION_multiple");
+ this.timeWindow.reverse( memory.workingMemoryContext,
+ accresult.context,
+ tuple,
+ handle,
+ workingMemory );
+ }
+
}
- }
+
+ //myLogger.debug("modification finished and ready to retrieve the accResult");
+
+ final Object result = this.timeWindow.getResult( memory.workingMemoryContext,
+ accresult.context,
+ leftTuple,
+ workingMemory );
+
+ if( result == null ) {
+ throw new RuntimeDroolsException("TimeWindow must not return a null value.");
+ }
+
+ // First alpha node filters
+ boolean isAllowed = true;
+ final InternalFactHandle createdHandle = workingMemory.getFactHandleFactory().newFactHandle( result, false, workingMemory ); // so far, result is not an event
+ for ( int i = 0, length = this.resultConstraints.length; i < length; i++ ) {
+ if ( !this.resultConstraints[i].isAllowed( createdHandle,
+ workingMemory ) ) {
+ isAllowed = false;
+ break;
+ }
+ }
+ //myLogger.debug("alpha node filtering done");
+ if ( isAllowed ) {
+ this.resultBinder.updateFromTuple( workingMemory,
+ leftTuple );
+ if ( this.resultBinder.isAllowedCachedLeft( createdHandle ) ) {
+ accresult.handle = createdHandle;
+
+ //myLogger.debug("before propagating left tuple");
+ this.sink.propagateAssertTuple( leftTuple,
+ createdHandle,
+ context,
+ workingMemory );
+ //myLogger.debug("after propagating left tuple "+leftTuple.toString()+": accresult == "+accresult.toString() +", accresult.handle == "+accresult.handle);
+ } else {
+ workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
+ }
+
+ this.resultBinder.resetTuple();
+ } else {
+ workingMemory.getFactHandleFactory().destroyFactHandle( createdHandle );
+ }
+
+ //myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
- }
-
- public void addExpiration (EventExpiration<?> expiration){ // see remarks NodeUpdate
- //NodeUpdate reminder = new NodeUpdate(expirationTime, node);
- int index = Collections.binarySearch(orderedExpirationList, expiration, evexComparator);
- /*if (index < 0)
- orderedExpirationList.add(-index-1, expiration);
- */
- orderedExpirationList.add(Math.abs(-index-1), expiration);
- }
+ /*} finally {
+ lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING modifyTuple_setOfHandles(); currentHoldCount: "+lock.getHoldCount());
+ }*/
+ }
+ /**
+ * @inheritDoc
+ *
+ * If a bunch of objects is retracted, call modify tuple for each
+ * tuple match. This method is is only invoked by the ExpirationChecker
+ * (not part of the original Drools implementation) and therefore
+ * doesn't need an extra parameter indicating whether it was invoked
+ * from "outside" or not
+ */
+ private void retractEventObjects(Set<InternalFactHandle> handles,
+ //final PropagationContext context,
+ final InternalWorkingMemory workingMemory) {
+
+ /*try{
+
+ myLogger.debug ("Thread "+Thread.currentThread().getId() +": TRYING TO ENTER retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
+ lock.lock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": ENTERING retractEventObjects(); currentHoldCount: "+lock.getHoldCount());
+ */
+ final TimeWindowMemory memory = (TimeWindowMemory) workingMemory.getNodeMemory( this );
+
+ if ( this.timeWindow.supportsReverse() ) {
+
+ Map<ReteTuple,Set<InternalFactHandle>> tuplesMap = new HashMap<ReteTuple,Set<InternalFactHandle>>();
+
+ for (InternalFactHandle currHandle : handles){
+
+ if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
+ return;
+ }
+
+ this.constraints.updateFromFactHandle( workingMemory,
+ currHandle );
+
+ // need to clone the tuples to avoid concurrent modification exceptions
+ Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+ for ( int i = 0; i < tuples.length; i++ ) {
+ ReteTuple tuple = (ReteTuple) tuples[i];
+ if ( this.constraints.isAllowedCachedRight( tuple ) ){
+ if (!tuplesMap.containsKey(tuple))
+ tuplesMap.put(tuple, new HashSet<InternalFactHandle>());
+ tuplesMap.get(tuple).add(currHandle);
+ }
+ }
+
+ this.constraints.resetFactHandle();
+
+ }
+
+ for (ReteTuple tupleToModify : tuplesMap.keySet())
+ this.modifyEventTuple( /*false,*/
+ tupleToModify,
+ tuplesMap.get(tupleToModify),
+ DEFAULT_RETRACTION_CONTEXT, //context,
+ workingMemory );
+
+ } else {
+
+ Set<ReteTuple> tuplesSet = new HashSet<ReteTuple>();
+
+ for (InternalFactHandle currHandle : handles){
+
+ if ( !memory.betaMemory.getFactHandleMemory().remove( currHandle ) ) {
+ return;
+ }
+
+ this.constraints.updateFromFactHandle( workingMemory,
+ currHandle );
+
+ // need to clone the tuples to avoid concurrent modification exceptions
+ Entry[] tuples = memory.betaMemory.getTupleMemory().toArray();
+ for ( int i = 0; i < tuples.length; i++ ) {
+ ReteTuple tuple = (ReteTuple) tuples[i];
+ if ( this.constraints.isAllowedCachedRight( tuple ) )
+ tuplesSet.add(tuple);
+ }
+
+ this.constraints.resetFactHandle();
+
+ }
+
+ for (ReteTuple tupleToModify : tuplesSet)
+ {
+ this.retractTuple( tupleToModify,
+ DEFAULT_RETRACTION_CONTEXT,
+ workingMemory/*,
+ true*/);
+ this.assertTuple( tupleToModify,
+ DEFAULT_RETRACTION_CONTEXT,
+ workingMemory/*,
+ true*/);
+ }
+
+ }
+
+ /*} finally {
+ lock.unlock(); myLogger.debug ("Thread "+Thread.currentThread().getId() +": LEAVING retractObjects(); currentHoldCount: "+lock.getHoldCount());
+ }*/
+
+ }
+
+ /**
+ * @inheritDoc
+ *
+ * Delete all expired objects and tuples. While doing so, lock methods
+ * such as assert.... and retract... for "original" Rete. This method
+ * is only invoked by the ExpirationChecker (not part of the original
+ * Drools implementation) and therefore doesn't need an extra parameter
+ * indicating whether it was invoked from "outside" or not.
+ */
+ public void retractExpiredEvents(final Set<InternalFactHandle> handles,
+ final Set<ReteTuple> tuples,
+ final InternalWorkingMemory workingMemory) {
+ try {
+ lock.lock(); myLogger.debug ("\nTimeWindowNode, Thread "+Thread.currentThread().getId() +": Start update");
+ this.retractEventObjects(handles, workingMemory);
+ this.retractEventTuples(tuples, workingMemory);
+ } finally {
+ lock.unlock();
+ }
+
+ }
+
public static class TimeWindowMemory {
private static final long serialVersionUID = 400L;
public Object workingMemoryContext;
public BetaMemory betaMemory;
- //public Queue<EventFactHandle> orderedEventFactList;
- //public Queue<ReteTuple> orderedEventTupleList;
}
private static class TimeWindowResult {
@@ -974,19 +987,4 @@
public Object context;
}
- private class EventExpirationComparator
- implements Comparator<EventExpiration<?>>{
-
- @Override
- public int compare(EventExpiration<?> evex1, EventExpiration<?> evex2) {
- long t1 = evex1.getExpires();
- long t2 = evex2.getExpires();
- if (t1 == t2)
- return 0;
- else if (t1 < t2)
- return -1;
- else return 1;
- }
-
- }
}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpiration.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -1,26 +1,28 @@
package org.drools.temporal;
-import org.drools.spi.Tuple;
+import org.drools.reteoo.TimeWindowNode;
public class EventExpiration<T> {
private final T event;
- private final long expires;
+ private final TimeWindowNode node;
+ //private final EventExpirationPoint evExPoint;
+ //private final long expires;
//private PropagationContext context;
- private final boolean ofTypeTuple;
+ //private final boolean ofTypeTuple;
+
+ /*public EventExpiration(final T event, final TimeWindowNode node, final long expires, PropagationContext context) {
+ this (event, node, expires, (event instanceof Tuple));
+ }*/
- public EventExpiration(final T event, final boolean isTuple, final long expires/*, PropagationContext context*/) {
+ public EventExpiration(final T event/*, final boolean isTuple*/, final TimeWindowNode node/*, final long expires, PropagationContext context, EventExpirationPoint evExPoint*/) {
this.event = event;
- this.ofTypeTuple = isTuple;
- this.expires = expires;
+ //this.ofTypeTuple = isTuple;
+ this.node = node;
+ //this.expires = expires;
//this.context = context;
-
- //System.out.println (this.toString());
+ //this.evExPoint = evExPoint;
}
-
- public EventExpiration(final T event, final long expires/*, PropagationContext context*/) {
- this(event, (event instanceof Tuple), expires);
- }
/**
* @return the tuple
@@ -38,20 +40,34 @@
/**
* @return the expires
- */
+ *//*
public long getExpires() {
return expires;
- }
+ }*/
/**
* @return Returns true if item is of type Tuple, false if it is of type EventFactHandle
- */
+ *//*
public boolean isOfTypeTuple() {
return ofTypeTuple;
}
-
- public String toString(){
- return event.toString()+ " expires at "+AbstractSessionClock.millisecsToString(expires);
+
+ /**
+ * @return the node
+ */
+ public TimeWindowNode getNode() {
+ return node;
}
+ /**
+ * @return the evExPoint
+ *//*
+ public EventExpirationPoint getEventExpirationPoint() {
+ return evExPoint;
+ }*/
+
+ /*public String toString(){
+ return event.toString()+ " at node "+node.toString()+ " expires at "+AbstractObservableSessionClock.millisecsToString(expires);
+ }*/
+
}
Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/EventExpirationPoint.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,61 @@
+package org.drools.temporal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.drools.common.InternalFactHandle;
+import org.drools.reteoo.ReteTuple;
+
+public class EventExpirationPoint {
+
+ private final long expires;
+ private Set<InternalFactHandle> eventObjectsToRetract;
+ private Set<ReteTuple> eventTuplesToRetract;
+
+ public EventExpirationPoint(final long expires) {
+ this.expires = expires;
+ this.eventObjectsToRetract = new HashSet<InternalFactHandle>();
+ this.eventTuplesToRetract = new HashSet<ReteTuple>();
+
+ }
+
+/* public EventExpirationPoint(EventExpiration<?> expiration) {
+ this(expiration.getExpires());
+ this.addEventExpiration(expiration);
+ }*/
+
+ /*public void addEventExpiration (EventExpiration<?> expiration){
+ if (expiration.isOfTypeTuple())
+ this.addEventTupleExpiration((EventExpiration<ReteTuple>) expiration);
+ else this.addEventObjectExpiration((EventExpiration<InternalFactHandle>)expiration);
+ }*/
+
+ public void addEventObjectExpiration (InternalFactHandle expiration){
+ eventObjectsToRetract.add(expiration);
+ }
+
+ public void addEventTupleExpiration (ReteTuple expiration){
+ eventTuplesToRetract.add(expiration);
+ }
+
+ /**
+ * @return the expires
+ */
+ public long getExpires() {
+ return expires;
+ }
+
+ /**
+ * @return the eventObjectsToRetract
+ */
+ public Set<InternalFactHandle> getEventObjectsToRetract() {
+ return eventObjectsToRetract;
+ }
+
+ /**
+ * @return the eventTuplesToRetract
+ */
+ public Set<ReteTuple> getEventTuplesToRetract() {
+ return eventTuplesToRetract;
+ }
+}
Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/ExpirationChecker.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,95 @@
+package org.drools.temporal;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+
+import org.apache.log4j.Logger;
+import org.drools.common.InternalWorkingMemory;
+
+public final class ExpirationChecker extends Thread{
+
+ private final Logger myLogger = Logger.getLogger(ExpirationChecker.class.getName());
+
+ private final InternalWorkingMemory workingMemory;
+ private LinkedList<TimeWindowNodeExpirationManager> orderedExpirationList;
+ private Long nextExpiration;
+
+ /** Singleton pattern: private class attribute, create single instance of the class. */
+ private static ExpirationChecker INSTANCE;
+
+ /** Singleton pattern: private constructor, no instantiation from outside */
+ private ExpirationChecker(final InternalWorkingMemory wm) {
+ //set ExpirationsCheckers priority to max so that expirations are detected asap
+ this.setPriority(Thread.MAX_PRIORITY);
+ this.workingMemory = wm;
+ //this.supervisedNodes = Collections.synchronizedSet(new HashSet<TimeWindowNodeExpirationManager>());
+ //this.expirationMap = Collections.synchronizedMap(new HashMap<TimeWindowNodeExpirationManager, NodeUpdate>());
+ this.orderedExpirationList = new LinkedList<TimeWindowNodeExpirationManager>();
+ this.nextExpiration = null;
+ // start thread (run-method)
+ this.start();
+ }
+
+ /** Singleton pattern: static method "createInstance()" creates instance with parameter, but only once. */
+ public static synchronized ExpirationChecker createInstance(final InternalWorkingMemory wm) {
+ if (INSTANCE == null) {
+ INSTANCE = new ExpirationChecker(wm);
+ } else {
+ throw new UnsupportedOperationException("Instance already created!");
+ }
+ return INSTANCE;
+ }
+
+ /** Singleton pattern: static method "getInstance()" returns single instance of the class. */
+ public static synchronized ExpirationChecker getInstance() {
+ if (INSTANCE != null) {
+ return INSTANCE;
+ } else {
+ throw new UnsupportedOperationException("Create instance with working memory as parameter first!");
+ }
+ }
+
+ // updates next expiration for given time window node
+ public void updateNextNodeExpiration(final TimeWindowNodeExpirationManager node){
+ // (temporarily) remove node from ordered list (since nextExpiration is not up to date anymore) if present
+ this.orderedExpirationList.remove(node);
+ // if new expiration != null, then find new position and insert node into ordered list (according to next expiration)
+ if (node.getNextExpiration() != null){
+ int index = Collections.binarySearch(this.orderedExpirationList, node, new NodeUpdateComparator());
+ if (index < 0)
+ this.orderedExpirationList.add(-index-1, node);
+ }
+ // set next expiration to expiration of first node in the ordered list (or null if empty)
+ this.nextExpiration = (this.orderedExpirationList.isEmpty())?null:this.orderedExpirationList.peek().getNextExpiration();
+ myLogger.debug ("ExpirationChecker, Thread "+Thread.currentThread().getId()+": changed next global expiration: "+((this.nextExpiration==null)?"undefined":AbstractSessionClock.millisecsToString(this.nextExpiration)));
+ }
+
+ @Override
+ public void run(){
+
+ while (true){
+ //SessionClock clock = GlobalSessionClockImpl.getInstance();
+ long currentTime = GlobalSessionClockImpl.getInstance().getCurrentTime();
+ //myLogger.debug ("\nExpirationChecker, Thread "+Thread.currentThread().getId()+": currentTime: "+AbstractSessionClock.millisecsToString(currentTime));
+
+ if (this.nextExpiration != null && this.nextExpiration <= currentTime)
+ this.orderedExpirationList.peek().determineExpiredEvents(currentTime, workingMemory);
+
+ // pause thread to allow other threads to execute (such as the event sender or the original Rete)
+ Thread.yield();
+ }
+
+ }
+
+ private class NodeUpdateComparator
+ implements Comparator<TimeWindowNodeExpirationManager>{
+
+ @Override
+ public int compare(TimeWindowNodeExpirationManager em1, TimeWindowNodeExpirationManager em2) {
+ return (int)(em1.getNextExpiration()-em2.getNextExpiration());
+ }
+
+ }
+
+}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/GlobalSessionClockImpl.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -6,7 +6,7 @@
/** Singleton pattern: private class attribute, create single instance of the class. */
private static SessionClock INSTANCE;
-
+
/** Singleton pattern: private constructor, no instantiation from outside */
private GlobalSessionClockImpl() {
}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionPseudoClock.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -28,9 +28,9 @@
*/
public class SessionPseudoClock extends
AbstractSessionClock {
-
+
private final Logger myLogger = Logger.getLogger(SessionPseudoClock.class.getName());
-
+
private long timer;
public SessionPseudoClock() {
@@ -68,4 +68,4 @@
}
-}
\ No newline at end of file
+}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/SessionSystemClock.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -1,4 +1,3 @@
-
/*
* Copyright 2007 JBoss Inc
*
@@ -69,4 +68,4 @@
}
}
-}
+}
Added: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java (rev 0)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/temporal/TimeWindowNodeExpirationManager.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -0,0 +1,95 @@
+package org.drools.temporal;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.drools.common.InternalFactHandle;
+import org.drools.common.InternalWorkingMemory;
+import org.drools.reteoo.ReteTuple;
+import org.drools.reteoo.TimeWindowNode;
+
+public class TimeWindowNodeExpirationManager {
+
+ private final Logger myLogger = Logger.getLogger(TimeWindowNode.class.getName());
+
+ private final TimeWindowNode parent;
+ private final ConcurrentSkipListMap<Long, EventExpirationPoint> orderedExpirationMap;
+ private Set<InternalFactHandle> eventObjectsToRetract;
+ private Set<ReteTuple> eventTuplesToRetract;
+ private Long nextExpiration;
+
+ public TimeWindowNodeExpirationManager(final TimeWindowNode parent) {
+ this.parent = parent;
+ this.orderedExpirationMap = new ConcurrentSkipListMap<Long, EventExpirationPoint>();
+ this.eventObjectsToRetract = /*Collections.synchronizedSet(*/new HashSet<InternalFactHandle>()/*)*/;
+ this.eventTuplesToRetract = /*Collections.synchronizedSet(*/new HashSet<ReteTuple>()/*)*/;
+ this.nextExpiration = null;
+
+ }
+
+ private EventExpirationPoint getEventExpirationPoint (long expirationTime){
+ // check whether expiration point exists already for specified time; if so, get it - if not, create one
+ EventExpirationPoint evExPoint;
+ if (this.orderedExpirationMap.containsKey(expirationTime))
+ evExPoint = this.orderedExpirationMap.get(expirationTime);
+ else {
+ evExPoint = new EventExpirationPoint(expirationTime);
+ this.orderedExpirationMap.put(expirationTime, evExPoint);
+ // check whether new expiration is earliest; if so, update nextExpiration
+ if (nextExpiration == null || expirationTime < nextExpiration)
+ this.setNextExpiration(expirationTime);
+ }
+ myLogger.debug("TimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": number of expiration points: "+this.orderedExpirationMap.keySet().size()+", next local expiration: "+((this.nextExpiration==null)?"undefined":AbstractSessionClock.millisecsToString(this.nextExpiration)));
+ return evExPoint;
+ }
+
+ public void addEventTupleExpiration (ReteTuple tuple, long expirationTime){ // see remarks NodeUpdate
+ myLogger.debug ("\nTimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": Added expiration "+AbstractSessionClock.millisecsToString(expirationTime)+" for tuple\n"+tuple.toString());
+ this.getEventExpirationPoint(expirationTime).addEventTupleExpiration(tuple);
+ }
+
+ public void addEventObjectExpiration (InternalFactHandle handle, long expirationTime){ // see remarks NodeUpdate
+ myLogger.debug ("\nTimeWindowNodeExpirationManager, Thread "+Thread.currentThread().getId()+": Added expiration "+AbstractSessionClock.millisecsToString(expirationTime)+" for handle\n"+handle.toString());
+ this.getEventExpirationPoint(expirationTime).addEventObjectExpiration(handle);
+ }
+
+ public void determineExpiredEvents (long currentTime, final InternalWorkingMemory workingMemory){
+ Map.Entry<Long, EventExpirationPoint> nextExpirationPoint;
+ // traverse through all expired entries
+ do {
+ // remove expired expiration point from queue
+ nextExpirationPoint = orderedExpirationMap.pollFirstEntry();
+ // send expired event objects to corresponding time window node
+ this.eventObjectsToRetract.addAll(nextExpirationPoint.getValue().getEventObjectsToRetract());
+ // send expired event tuples to corresponding time window node
+ this.eventTuplesToRetract.addAll(nextExpirationPoint.getValue().getEventTuplesToRetract());
+ // get head of queue = next expiration point
+ nextExpirationPoint = orderedExpirationMap.firstEntry();
+ } while (nextExpirationPoint != null && nextExpirationPoint.getKey() <= currentTime);
+
+ // retract the expired events in the corresponding timeWindowNode
+ parent.retractExpiredEvents(this.eventObjectsToRetract, this.eventTuplesToRetract, workingMemory);
+ // delete the lists of expired events
+ this.eventObjectsToRetract.clear();
+ this.eventTuplesToRetract.clear();
+
+ // set new node expiration and notify ExpirationChecker
+ this.setNextExpiration((nextExpirationPoint==null)?null:nextExpirationPoint.getKey());
+ }
+
+ public Long getNextExpiration() {
+ return nextExpiration;
+ }
+
+ // set next expiration to new value and notify ExpirationChecker
+ public synchronized void setNextExpiration(Long nextExpiration) {
+ this.nextExpiration = nextExpiration;
+ ExpirationChecker.getInstance().updateNextNodeExpiration(this);
+ }
+
+}
Modified: labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java
===================================================================
--- labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java 2008-04-21 12:51:16 UTC (rev 19667)
+++ labs/jbossrules/branches/temporal_rete/drools-core-window-NTA/src/main/java/org/drools/util/OrderedFactHashTable.java 2008-04-21 13:00:10 UTC (rev 19668)
@@ -6,7 +6,6 @@
import org.drools.common.EventFactHandle;
import org.drools.common.InternalFactHandle;
-import org.drools.spi.PropagationContext;
public class OrderedFactHashTable extends FactHashTable {
More information about the jboss-svn-commits
mailing list