Hi,
I am
trying to understand how Drools can be used to monitor events over a period of
time using Sliding Time Windows. I have created a rule to sound an alarm when
average temperature read from a sensor is above 25 degrees (threshold) over a
time period of 5 minutes. The rule makes of use of the Stream processing mode so
that continuous stream of events could be processed.
Below
is how my rule looks like:
//declare any global
variables here
global
java.lang.StringBuilder alertMessage
// add declaration to
change the Fact into an Event
declare SensorReading
@role(event)
end
/* Alert when average
temperature is above 25
over a time period of 5 minutes */
rule "TemperatureAlarm1"
when
//conditions
$averageTemp : Number(doubleValue >
25.00)
from accumulate(SensorReading($temp
: temperature)
over
window:time(5m) from entry-point "Temperature Reading", average($temp))
then
//actions
System.out.println("Fired rule:
" +
kcontext.getRule().getName());
alertMessage.append("Threshold
temperature breached!!"+
"\nTurn on the
Air Conditioner"+
"\nAverage
temperature over 5 minutes is above 25 (" + $averageTemp.intValue() + ")\n");
end
And below
is the snapshot of the fact (SensorReading) which is inserted as an event in
the working memory:
public class SensorReading {
private double temperature;
public SensorReading(){}
public SensorReading(double temp){
this.temperature = temp;
}
// getters and setters
}
In
order to test the rule, I am using Pseudo Clock with Stream processing turned
on. I am inserting three SensorReading objects in the working memory with temperature
values as (24, 26, 28) after every minute, so that the average of the
temperatures is above threshold and the rule is invoked. After the objects are
inserted in the working memory, I am deliberately advancing the Pseudo clock by
another 1 minute, so that the total time elapsed is 4 minutes. The rule works
as expected with the above test setup and prints the average value as 26 on the
console.
However,
if I advance the clock by 2 minutes instead of 1 minute after three sensor
reading objects have been inserted in the working memory (after every 1 minute
interval), the rule gets invoked but the average value gets changed to 27 (26 +
28 / 2 == 27). Looks like the first temperature reading is getting ignored by
the rule despite the fact that it falls well within the specified time range of
5 minutes. Below is the snapshot of my test class:
public class TemperatureAlarmTest
{
static KnowledgeBase kbase;
static StatefulKnowledgeSession ksession;
static KnowledgeRuntimeLogger logger;
static SessionPseudoClock clock;
@BeforeClass
public static void setupKsession() {
try {
// load up the
knowledge base
kbase = readKnowledgeBase();
ksession = readKnowldedeSession(kbase);
clock = ksession.getSessionClock();
logger =
KnowledgeRuntimeLoggerFactory.newThreadedFileLogger(ksession, "log/Errors", 500);
} catch (Throwable t) {
t.printStackTrace();
}
}
/**
*
Create a new Stateful knowledge Session with a pseudo clock from the
*
knowledge base
*
* @param kbase
* @return
* @throws Exception
*/
private static
StatefulKnowledgeSession readKnowldedeSession(
KnowledgeBase kbase) throws Exception {
// Knowledge Session Configuration
KnowledgeSessionConfiguration
config = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
config.setOption(ClockTypeOption.get("pseudo"));
return
kbase.newStatefulKnowledgeSession(config, null);
}
@AfterClass
public static void closeKsession() {
try {
// load up the
knowledge base
logger.close();
ksession.dispose();
} catch (Throwable t) {
t.printStackTrace();
}
}
@Test
public void
TemperatureAlarm1_Test() {
// Create Temperature list
ArrayList<SensorReading>
tempMetrics = new ArrayList<SensorReading>();
double temp = 24.00;
while (tempMetrics.size()
< 3) {
tempMetrics.add(new
SensorReading(temp));
temp += 2;
}
System.out.println("Size of
tempMetrics List: "+tempMetrics.size()+"\n");
System.out.println("First Temp
reading: "+tempMetrics.get(0).getTemperature());
System.out.println("Second Temp
reading: "+tempMetrics.get(1).getTemperature());
System.out.println("Third Temp
reading: "+tempMetrics.get(2).getTemperature()+"\n");
// Separate stream for inserts
WorkingMemoryEntryPoint
temperatureStream = ksession.getWorkingMemoryEntryPoint( "Temperature
Reading" );
// Create fact handle list
ArrayList<FactHandle>
factHandleList = new ArrayList<FactHandle>();
// Insert objects into working
memory while advancing the clock
for (int i = 0; i <
tempMetrics.size(); i++) {
factHandleList.add(temperatureStream.insert(tempMetrics.get(i)));
clock.advanceTime(1,
TimeUnit.MINUTES);
System.out.println("Time advances
by 1 minute");
}
System.out.println("Fact Count
is: "+temperatureStream.getFactCount());
System.out.println("Fact Entry
Point is: "+temperatureStream.getEntryPointId());
System.out.println("Size of
FactHandleList: "+factHandleList.size()+"\n");
clock.advanceTime(1,
TimeUnit.MINUTES); //change in
advanced time alters the rule behavior
StringBuilder stringBuilder = new StringBuilder();
ksession.setGlobal("alertMessage", stringBuilder);
ksession.fireAllRules();
// Remove facts
for (int i = 0; i <
factHandleList.size(); i++) {
temperatureStream.retract(factHandleList.get(i));
}
System.out.println("After
Removing facts");
System.out.println("Fact Count
is: "+temperatureStream.getFactCount());
String result =
stringBuilder.substring(0, 32);
System.out.println("Alert Message
is: \n" + stringBuilder.toString());
assertEquals("Alert Message
is: ",
"Threshold
temperature breached!!", result);
}
/**
*
Create the knowledge base with stream processing turned on.
*
* @return
* @throws Exception
*/
private static KnowledgeBase
readKnowledgeBase() throws Exception {
KnowledgeBuilder kbuilder =
KnowledgeBuilderFactory.newKnowledgeBuilder();
kbuilder.add(ResourceFactory.newClassPathResource("TemperatureAlarm1.drl"),ResourceType.DRL);
hasErrors(kbuilder);
// Stream processing turned on
KnowledgeBaseConfiguration conf =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
conf.setOption(EventProcessingOption.STREAM);
KnowledgeBase kbase =
KnowledgeBaseFactory.newKnowledgeBase(conf);
hasErrors(kbuilder);
kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
return kbase;
}
/**
*
Report errors if any
*
* @param kbuilder
* @throws Exception
*/
private static void
hasErrors(KnowledgeBuilder kbuilder) throws Exception {
KnowledgeBuilderErrors errors =
kbuilder.getErrors();
if (errors.size() > 0) {
for
(KnowledgeBuilderError error : errors) {
System.err.println(error);
}
throw new
IllegalArgumentException("Could not parse knowledge.");
}
}
}
Could
anyone please help explain this change in the behavior of the rule?
Regards,
Sushant