Hi,
 
I am trying to understand how Drools can be used to monitor events over a period of time using Sliding Time Windows. I have created a rule to sound an alarm when average temperature read from a sensor is above 25 degrees (threshold) over a time period of 5 minutes. The rule makes of use of the Stream processing mode so that continuous stream of events could be processed.
 
Below is how my rule looks like:
 
//declare any global variables here
global java.lang.StringBuilder alertMessage
 
// add declaration to change the Fact into an Event
declare SensorReading
       @role(event)
end
 
/* Alert when average temperature is above 25
   over a time period of 5 minutes */
rule "TemperatureAlarm1"
 
    when
        //conditions
        $averageTemp : Number(doubleValue > 25.00) 
                           from accumulate(SensorReading($temp : temperature)
                                  over window:time(5m) from entry-point "Temperature Reading", average($temp))
    then
        //actions
        System.out.println("Fired rule: " + kcontext.getRule().getName());
        alertMessage.append("Threshold temperature breached!!"+
                           "\nTurn on the Air Conditioner"+
                           "\nAverage temperature over 5 minutes is above 25 (" + $averageTemp.intValue() + ")\n");
       
end
 
And below is the snapshot of the fact (SensorReading) which is inserted as an event in the working memory:
 
public class SensorReading {
 
       private double temperature;
      
       public SensorReading(){}
      
       public SensorReading(double temp){
              this.temperature = temp;
       }
 
       // getters and setters    
}
 
 
In order to test the rule, I am using Pseudo Clock with Stream processing turned on. I am inserting three SensorReading objects in the working memory with temperature values as (24, 26, 28) after every minute, so that the average of the temperatures is above threshold and the rule is invoked. After the objects are inserted in the working memory, I am deliberately advancing the Pseudo clock by another 1 minute, so that the total time elapsed is 4 minutes. The rule works as expected with the above test setup and prints the average value as 26 on the console.
 
However, if I advance the clock by 2 minutes instead of 1 minute after three sensor reading objects have been inserted in the working memory (after every 1 minute interval), the rule gets invoked but the average value gets changed to 27 (26 + 28 / 2 == 27). Looks like the first temperature reading is getting ignored by the rule despite the fact that it falls well within the specified time range of 5 minutes. Below is the snapshot of my test class:
 
public class TemperatureAlarmTest {
 
       static KnowledgeBase kbase;
       static StatefulKnowledgeSession ksession;
       static KnowledgeRuntimeLogger logger;
       static SessionPseudoClock clock;
 
       @BeforeClass
       public static void setupKsession() {
              try {
                     // load up the knowledge base
                     kbase = readKnowledgeBase();
                     ksession = readKnowldedeSession(kbase);
                     clock = ksession.getSessionClock();
 
                     logger = KnowledgeRuntimeLoggerFactory.newThreadedFileLogger(ksession, "log/Errors", 500);
 
              } catch (Throwable t) {
                     t.printStackTrace();
              }
       }
 
       /**
        * Create a new Stateful knowledge Session with a pseudo clock from the
        * knowledge base
        *
        * @param kbase
        * @return
        * @throws Exception
        */
       private static StatefulKnowledgeSession readKnowldedeSession(
                     KnowledgeBase kbase) throws Exception {
 
              // Knowledge Session Configuration
              KnowledgeSessionConfiguration config = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
              config.setOption(ClockTypeOption.get("pseudo"));
              return kbase.newStatefulKnowledgeSession(config, null);
 
       }
 
       @AfterClass
       public static void closeKsession() {
              try {
                     // load up the knowledge base
                     logger.close();
                     ksession.dispose();
 
              } catch (Throwable t) {
                     t.printStackTrace();
              }
       }
      
       @Test
       public void TemperatureAlarm1_Test() {
 
              // Create Temperature list
              ArrayList<SensorReading> tempMetrics = new ArrayList<SensorReading>();
              double temp = 24.00;
 
              while (tempMetrics.size() < 3) {
                     tempMetrics.add(new SensorReading(temp));
                     temp += 2;
              }
              System.out.println("Size of tempMetrics List: "+tempMetrics.size()+"\n");
              System.out.println("First Temp reading: "+tempMetrics.get(0).getTemperature());
              System.out.println("Second Temp reading: "+tempMetrics.get(1).getTemperature());
              System.out.println("Third Temp reading: "+tempMetrics.get(2).getTemperature()+"\n");
             
              // Separate stream for inserts
              WorkingMemoryEntryPoint temperatureStream = ksession.getWorkingMemoryEntryPoint( "Temperature Reading" );
             
              // Create fact handle list
              ArrayList<FactHandle> factHandleList = new ArrayList<FactHandle>();
               
              // Insert objects into working memory while advancing the clock
              for (int i = 0; i < tempMetrics.size(); i++) {
                     factHandleList.add(temperatureStream.insert(tempMetrics.get(i)));
                     clock.advanceTime(1, TimeUnit.MINUTES);
                     System.out.println("Time advances by 1 minute");
              }
              System.out.println("Fact Count is: "+temperatureStream.getFactCount());
              System.out.println("Fact Entry Point is: "+temperatureStream.getEntryPointId());
              System.out.println("Size of FactHandleList: "+factHandleList.size()+"\n");
             
              clock.advanceTime(1, TimeUnit.MINUTES);         //change in advanced time alters the rule behavior
             
              StringBuilder stringBuilder = new StringBuilder();
              ksession.setGlobal("alertMessage", stringBuilder);
              ksession.fireAllRules();
 
              // Remove facts
              for (int i = 0; i < factHandleList.size(); i++) {
                     temperatureStream.retract(factHandleList.get(i));
              }
              System.out.println("After Removing facts");
              System.out.println("Fact Count is: "+temperatureStream.getFactCount());
             
              String result = stringBuilder.substring(0, 32);
              System.out.println("Alert Message is: \n" + stringBuilder.toString());
              assertEquals("Alert Message is: ", "Threshold temperature breached!!", result);
       }
      
       /**
        * Create the knowledge base with stream processing turned on.
        *
        * @return
        * @throws Exception
        */
       private static KnowledgeBase readKnowledgeBase() throws Exception {
              KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
              kbuilder.add(ResourceFactory.newClassPathResource("TemperatureAlarm1.drl"),ResourceType.DRL);
              hasErrors(kbuilder);
             
              // Stream processing turned on
              KnowledgeBaseConfiguration conf = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
              conf.setOption(EventProcessingOption.STREAM);
              KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(conf);
              hasErrors(kbuilder);
              kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
 
              return kbase;
       }
 
       /**
        * Report errors if any
        *
        * @param kbuilder
        * @throws Exception
        */
       private static void hasErrors(KnowledgeBuilder kbuilder) throws Exception {
              KnowledgeBuilderErrors errors = kbuilder.getErrors();
              if (errors.size() > 0) {
                     for (KnowledgeBuilderError error : errors) {
                           System.err.println(error);
                     }
                     throw new IllegalArgumentException("Could not parse knowledge.");
              }
 
       }
 
}
 
 
Could anyone please help explain this change in the behavior of the rule?
 
Regards,
Sushant