Hi,
I am
trying to understand how Drools can be used to monitor events over a period of
time using Sliding Time Windows. I have created a rule to sound an alarm when
average temperature read from a sensor is above 25 degrees (threshold) over a
time period of 5 minutes. The rule makes of use of the Stream processing mode so
that continuous stream of events could be processed.
Below
is how my rule looks like:
//declare any global
variables here
globaljava.lang.StringBuilder alertMessage
// add declaration to
change the Fact into an Event
declareSensorReading
@role(event)
end
/* Alert when average
temperature is above 25
over a time period of 5 minutes */
rule"TemperatureAlarm1"
when
//conditions
$averageTemp : Number(doubleValue >
25.00)
fromaccumulate(SensorReading($temp
: temperature)
over
window:time(5m) fromentry-point "Temperature Reading", average($temp))
then
//actions
System.out.println("Fired rule:
"+
kcontext.getRule().getName());
alertMessage.append("Threshold
temperature breached!!"+
"\nTurn on the
Air Conditioner"+
"\nAverage
temperature over 5 minutes is above 25 ("+ $averageTemp.intValue() +
")\n");
end
And below
is the snapshot of the fact (SensorReading) which is inserted as an event in
the working memory:
publicclassSensorReading {
privatedoubletemperature;
publicSensorReading(){}
publicSensorReading(doubletemp){
this.temperature= temp;
}
// getters and setters
}
In
order to test the rule, I am using Pseudo Clock with Stream processing turned
on. I am inserting three SensorReading objects in the working memory with temperature
values as (24, 26, 28) after every minute, so that the average of the
temperatures is above threshold and the rule is invoked. After the objects are
inserted in the working memory, I am deliberately advancing the Pseudo clock by
another 1 minute, so that the total time elapsed is 4 minutes. The rule works
as expected with the above test setup and prints the average value as 26 on the
console.
However,
if I advance the clock by 2 minutes instead of 1 minute after three sensor
reading objects have been inserted in the working memory (after every 1 minute
interval), the rule gets invoked but the average value gets changed to 27 (26 +
28 / 2 == 27). Looks like the first temperature reading is getting ignored by
the rule despite the fact that it falls well within the specified time range of
5 minutes. Below is the snapshot of my test class:
publicclassTemperatureAlarmTest
{
staticKnowledgeBase kbase;
staticStatefulKnowledgeSession ksession;
staticKnowledgeRuntimeLogger logger;
staticSessionPseudoClock clock;
@BeforeClass
publicstaticvoidsetupKsession() {
try{
// load up the
knowledge base
kbase= readKnowledgeBase();
ksession= readKnowldedeSession(kbase);
clock= ksession.getSessionClock();
logger=
KnowledgeRuntimeLoggerFactory.newThreadedFileLogger(ksession, "log/Errors",
500);
} catch(Throwable t) {
t.printStackTrace();
}
}
/**
*
Create a new Stateful knowledge Session with a pseudo clock from the
*
knowledge base
*
* @paramkbase
* @return
* @throwsException
*/
privatestaticStatefulKnowledgeSession readKnowldedeSession(
KnowledgeBase kbase) throwsException {
// Knowledge Session Configuration
KnowledgeSessionConfiguration
config = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
config.setOption(ClockTypeOption.get("pseudo"));
returnkbase.newStatefulKnowledgeSession(config, null);
}
@AfterClass
publicstaticvoidcloseKsession() {
try{
// load up the
knowledge base
logger.close();
ksession.dispose();
} catch(Throwable t) {
t.printStackTrace();
}
}
@Test
publicvoidTemperatureAlarm1_Test() {
// Create Temperature list
ArrayList<SensorReading>
tempMetrics = newArrayList<SensorReading>();
doubletemp = 24.00;
while(tempMetrics.size()
< 3) {
tempMetrics.add(newSensorReading(temp));
temp += 2;
}
System.out.println("Size of
tempMetrics List: "+tempMetrics.size()+"\n");
System.out.println("First Temp
reading: "+tempMetrics.get(0).getTemperature());
System.out.println("Second Temp
reading: "+tempMetrics.get(1).getTemperature());
System.out.println("Third Temp
reading: "+tempMetrics.get(2).getTemperature()+"\n");
// Separate stream for inserts
WorkingMemoryEntryPoint
temperatureStream = ksession.getWorkingMemoryEntryPoint( "Temperature
Reading");
// Create fact handle list
ArrayList<FactHandle>
factHandleList = newArrayList<FactHandle>();
// Insert objects into working
memory while advancing the clock
for(inti = 0; i <
tempMetrics.size(); i++) {
factHandleList.add(temperatureStream.insert(tempMetrics.get(i)));
clock.advanceTime(1,
TimeUnit.MINUTES);
System.out.println("Time advances
by 1 minute");
}
System.out.println("Fact Count
is: "+temperatureStream.getFactCount());
System.out.println("Fact Entry
Point is: "+temperatureStream.getEntryPointId());
System.out.println("Size of
FactHandleList: "+factHandleList.size()+"\n");
clock.advanceTime(1,
TimeUnit.MINUTES); //change in
advanced time alters the rule behavior
StringBuilder stringBuilder = newStringBuilder();
ksession.setGlobal("alertMessage", stringBuilder);
ksession.fireAllRules();
// Remove facts
for(inti = 0; i <
factHandleList.size(); i++) {
temperatureStream.retract(factHandleList.get(i));
}
System.out.println("After
Removing facts");
System.out.println("Fact Count
is: "+temperatureStream.getFactCount());
String result =
stringBuilder.substring(0, 32);
System.out.println("Alert Message
is: \n"+ stringBuilder.toString());
assertEquals("Alert Message
is: ", "Threshold
temperature breached!!", result);
}
/**
*
Create the knowledge base with stream processing turned on.
*
* @return
* @throwsException
*/
privatestaticKnowledgeBase
readKnowledgeBase() throwsException {
KnowledgeBuilder kbuilder =
KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newClassPathResource("TemperatureAlarm1.drl"),ResourceType.DRL);
hasErrors(kbuilder);
// Stream processing turned on
KnowledgeBaseConfiguration conf =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
conf.setOption(EventProcessingOption.STREAM);
KnowledgeBase kbase =
KnowledgeBaseFactory.newKnowledgeBase(conf);
hasErrors(kbuilder);
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
returnkbase;
}
/**
*
Report errors if any
*
* @paramkbuilder
* @throwsException
*/
privatestaticvoidhasErrors(KnowledgeBuilder kbuilder) throwsException {
KnowledgeBuilderErrors errors =
kbuilder.getErrors();
if(errors.size() > 0) {
for(KnowledgeBuilderError error : errors) {
System.err.println(error);
}
thrownewIllegalArgumentException("Could not parse
knowledge.");
}
}
}
Could
anyone please help explain this change in the behavior of the rule?
Regards,
Sushant