Hi
I am using Drools 5.1.0 M1 within Eclipse. I am trying out a
sample rule, which fires if the minimum cpu is over 80 for 5s using sliding
windows. I see that it works well over a running window of 5s, but for the
first time, it fires even before reaching 5s.
It seems to ignore the fact that 5s hasn’t elapsed
yet. I am using the engine in STREAM mode and using a pseudo clock to advance
the time manually.
Is this the expected behavior? Thanks!
sb
Here is the rule:
package org.drools.examples
import org.drools.examples.CpuMetric.Cpu;
import org.drools.examples.CpuMetric.Alarm;
global org.apache.log4j.Logger logger
declare Cpu
@role(event)
@expires(5s)
end
rule "Above Cpu threshold of 80 for 5s"
dialect
"java"
when
not
Alarm()
$cpuMin
: Number(intValue >= 80) from accumulate(
$cpu
: Cpu($v : value) over window:time(5s), min($v)
)
then
logger.info("Cpu
above 80 for 5 s, raising alarm. min cpu: "+$cpuMin);
Alarm
a = new Alarm();
a.setReason("raised
alarm as we hit cpu threshold");
a.setTime(System.currentTimeMillis());
insert(a);
end
Here is the snippet of the class that declares the Cpu and Alarm
class as well inserts events into the rule engine. The other thing I noticed is
the accumulate function seems to ignore the regex in the <src pattern>.
For example, in the rule above, if I give
$cpuMin : Number(intValue >= 80) from accumulate(
$cpu : Cpu($v : value, srcIp ==’<val1>’
&& destIp == ‘<val2>’) over window:time(5s), min($v)
And insert cpu events, with no matching srcIp and destIp, I
shouldn’t see any alarm raised, but I do and right at the beginning.
. . ... (code before
this..)
//to use sliding
windows, have to run the engine in stream mode
//default is cloud
mode..where there are no concept of time and
//event ordering
KnowledgeBaseConfiguration
kbaseconfig = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
kbaseconfig.setOption(EventProcessingOption.STREAM);
// add the packages
to a knowledgebase (deploy the knowledge packages).
KnowledgeBase kbase =
KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);
kbase.addKnowledgePackages(pkgs);
KnowledgeSessionConfiguration
conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption(ClockTypeOption.get("pseudo"));
StatefulKnowledgeSession
ksession = kbase.newStatefulKnowledgeSession(conf, null);
//get clock to
manually advance and test firing..
SessionPseudoClock
clock = ksession.getSessionClock();
logger.info("Pseudo
clock current time: "+clock.getCurrentTime());
ksession.setGlobal("logger",
logger);
ksession.addEventListener(new
DebugAgendaEventListener());
ksession.addEventListener(new
DebugWorkingMemoryEventListener());
// setup the audit
logging
KnowledgeRuntimeLogger
krlogger = KnowledgeRuntimeLoggerFactory
.newFileLogger(ksession,
"log/cpu");
BufferedReader bf =
new BufferedReader(new FileReader("/opt/cpumetricdata.txt"));
String s;
long time=0;
int count=0;
long lastime=0;
int delta=0;
//logger.info("Advancing
5s right at the start");
//clock.advanceTime(5000,
TimeUnit.MILLISECONDS);
while((s =
bf.readLine()) != null)
{
String[] vals = s.split(",");
Cpu cpumetric = new Cpu();
cpumetric.setValue(Integer.parseInt(vals[1]));
//set in ms
//for the first time, initialize time and lastime
//to the value read in from the first line.
time = Long.parseLong(vals[0]);
if(count ==0)
{
lastime
= time;
logger.info("Initialized
lastime to "+lastime);
}
cpumetric.setTime(time);
cpumetric.setSrcIp("10.155.21.86");
cpumetric.setDestIp("10.6.35.120");
logger.info("Inserted cpu metric "+cpumetric);
logger.info("Count: "+count+" Pseudo clock current time:
"+clock.getCurrentTime());
ksession.insert(cpumetric);
ksession.fireAllRules();
//advance based on the read in time in ms
//do it only from the second insert onwards
delta=(int) (time-lastime);
if(count >=1)
{
clock.advanceTime(delta,
TimeUnit.MILLISECONDS);
logger.info("Pseudo
clock advanced "+delta+ "ms");
}
count++;
lastime=time;
}
System.out.println("Inserted
facts, current time is "+new Date());
krlogger.close();
ksession.dispose();
bf.close();
}
public static class Alarm
{
private String
reason;
private long time;
private String type;
/**
* @return the
reason
*/
public String
getReason()
{
return reason;
}
/**
* @param reason
the reason to set
*/
public void
setReason(String reason)
{
this.reason = reason;
}
/**
* @return the
time
*/
public long getTime()
{
return time;
}
/**
* @return the
type
*/
public String
getType()
{
return type;
}
/**
* @param type
the type to set
*/
public void
setType(String type)
{
this.type = type;
}
/**
* @param time
the time to set
*/
public void
setTime(long time)
{
this.time = time;
}
}
public static class Cpu
implements Serializable
{
private long time;
private int value;
private String srcIp;
private String
destIp;
/* (non-Javadoc)
* @see
java.lang.Object#toString()
*/
@Override
public String
toString()
{
return "Cpu [time=" + time + ", value=" + value +
"]";
}
/**
* @return the
time
*/
public long getTime()
{
return time;
}
/**
* @param time
the time to set
*/
public void
setTime(long time)
{
this.time = time;
}
/**
* @return the
srcIp
*/
public String
getSrcIp()
{
return srcIp;
}
/**
* @param srcIp
the srcIp to set
*/
public void
setSrcIp(String srcIp)
{
this.srcIp = srcIp;
}
/**
* @return the
destIp
*/
public String
getDestIp()
{
return destIp;
}
/**
* @param destIp
the destIp to set
*/
public void
setDestIp(String destIp)
{
this.destIp = destIp;
}
/**
* @return the
value
*/
public int getValue()
{
return value;
}
/**
* @param value
the value to set
*/
public void
setValue(int value)
{
this.value = value;
}
}
}
Data that drives the insertion:
1358,81
2359,86
3360,88
4361,80
5362,84
6363,80
7364,83
8365,99
9366,97
10367,99