Hi
I am using Drools 5.1.0 M1 within Eclipse. I am trying out a sample
rule, which fires if the minimum cpu is over 80 for 5s using sliding
windows. I see that it works well over a running window of 5s, but for
the first time, it fires even before reaching 5s.
It seems to ignore the fact that 5s hasn't elapsed yet. I am using the
engine in STREAM mode and using a pseudo clock to advance the time
manually.
Is this the expected behavior? Thanks!
sb
Here is the rule:
package org.drools.examples
import org.drools.examples.CpuMetric.Cpu;
import org.drools.examples.CpuMetric.Alarm;
global org.apache.log4j.Logger logger
declare Cpu
@role(event)
@expires(5s)
end
rule "Above Cpu threshold of 80 for 5s"
dialect "java"
when
not Alarm()
$cpuMin : Number(intValue >= 80) from accumulate(
$cpu : Cpu($v : value) over window:time(5s), min($v)
)
then
logger.info("Cpu above 80 for 5 s, raising alarm. min cpu:
"+$cpuMin);
Alarm a = new Alarm();
a.setReason("raised alarm as we hit cpu threshold");
a.setTime(System.currentTimeMillis());
insert(a);
end
Here is the snippet of the class that declares the Cpu and Alarm class
as well inserts events into the rule engine. The other thing I noticed
is the accumulate function seems to ignore the regex in the <src
pattern>. For example, in the rule above, if I give
$cpuMin : Number(intValue >= 80) from accumulate(
$cpu : Cpu($v : value, srcIp =='<val1>' && destIp ==
'<val2>') over
window:time(5s), min($v)
And insert cpu events, with no matching srcIp and destIp, I shouldn't
see any alarm raised, but I do and right at the beginning.
. . ... (code before this..)
//to use sliding windows, have to run the engine in stream mode
//default is cloud mode..where there are no concept of time and
//event ordering
KnowledgeBaseConfiguration kbaseconfig =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
kbaseconfig.setOption(EventProcessingOption.STREAM);
// add the packages to a knowledgebase (deploy the knowledge
packages).
KnowledgeBase kbase =
KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);
kbase.addKnowledgePackages(pkgs);
KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption(ClockTypeOption.get("pseudo"));
StatefulKnowledgeSession ksession =
kbase.newStatefulKnowledgeSession(conf, null);
//get clock to manually advance and test firing..
SessionPseudoClock clock = ksession.getSessionClock();
logger.info("Pseudo clock current time: "+clock.getCurrentTime());
ksession.setGlobal("logger", logger);
ksession.addEventListener(new DebugAgendaEventListener());
ksession.addEventListener(new DebugWorkingMemoryEventListener());
// setup the audit logging
KnowledgeRuntimeLogger krlogger = KnowledgeRuntimeLoggerFactory
.newFileLogger(ksession, "log/cpu");
BufferedReader bf = new BufferedReader(new
FileReader("/opt/cpumetricdata.txt"));
String s;
long time=0;
int count=0;
long lastime=0;
int delta=0;
//logger.info("Advancing 5s right at the start");
//clock.advanceTime(5000, TimeUnit.MILLISECONDS);
while((s = bf.readLine()) != null)
{
String[] vals = s.split(",");
Cpu cpumetric = new Cpu();
cpumetric.setValue(Integer.parseInt(vals[1]));
//set in ms
//for the first time, initialize time and lastime
//to the value read in from the first line.
time = Long.parseLong(vals[0]);
if(count ==0)
{
lastime = time;
logger.info("Initialized lastime to "+lastime);
}
cpumetric.setTime(time);
cpumetric.setSrcIp("10.155.21.86");
cpumetric.setDestIp("10.6.35.120");
logger.info("Inserted cpu metric "+cpumetric);
logger.info("Count: "+count+" Pseudo clock current time:
"+clock.getCurrentTime());
ksession.insert(cpumetric);
ksession.fireAllRules();
//advance based on the read in time in ms
//do it only from the second insert onwards
delta=(int) (time-lastime);
if(count >=1)
{
clock.advanceTime(delta, TimeUnit.MILLISECONDS);
logger.info("Pseudo clock advanced "+delta+ "ms");
}
count++;
lastime=time;
}
System.out.println("Inserted facts, current time is "+new Date());
krlogger.close();
ksession.dispose();
bf.close();
}
public static class Alarm
{
private String reason;
private long time;
private String type;
/**
* @return the reason
*/
public String getReason()
{
return reason;
}
/**
* @param reason the reason to set
*/
public void setReason(String reason)
{
this.reason = reason;
}
/**
* @return the time
*/
public long getTime()
{
return time;
}
/**
* @return the type
*/
public String getType()
{
return type;
}
/**
* @param type the type to set
*/
public void setType(String type)
{
this.type = type;
}
/**
* @param time the time to set
*/
public void setTime(long time)
{
this.time = time;
}
}
public static class Cpu implements Serializable
{
private long time;
private int value;
private String srcIp;
private String destIp;
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
return "Cpu [time=" + time + ", value=" + value +
"]";
}
/**
* @return the time
*/
public long getTime()
{
return time;
}
/**
* @param time the time to set
*/
public void setTime(long time)
{
this.time = time;
}
/**
* @return the srcIp
*/
public String getSrcIp()
{
return srcIp;
}
/**
* @param srcIp the srcIp to set
*/
public void setSrcIp(String srcIp)
{
this.srcIp = srcIp;
}
/**
* @return the destIp
*/
public String getDestIp()
{
return destIp;
}
/**
* @param destIp the destIp to set
*/
public void setDestIp(String destIp)
{
this.destIp = destIp;
}
/**
* @return the value
*/
public int getValue()
{
return value;
}
/**
* @param value the value to set
*/
public void setValue(int value)
{
this.value = value;
}
}
}
Data that drives the insertion:
1358,81
2359,86
3360,88
4361,80
5362,84
6363,80
7364,83
8365,99
9366,97
10367,99