Hi Edson
Thanks for the clarification. The sliding
window implementation does wait for 5s, later on during a test, before
activating the rule. So, theoretically, there shouldn’t be any difference
in behavior, even at the beginning, whether the clock is in pseudo or real
mode.
Lets assume t(0) is the start, then
nothing should fire atleast until t(0+5) and it should be the same for the
window between t(n) and t(n+5).
Also, I noticed that the accumulate function seems to ignore the
regex in the <src pattern>. For example, given the rule below, if I insert
Cpu events with no matching srcIp and destIp, then I would expect no rule to
fire, since the window would look at only the Cpu objects in working memory
that match the criteria below. Am I missing something?
$cpuMin
: Number(intValue >= 80) from accumulate(
$cpu
: Cpu($v : value, srcIp ==’<val1>’ && destIp ==
‘<val2>’) over window:time(5s), min($v)
From: rules-users-bounces@lists.jboss.org
[mailto:rules-users-bounces@lists.jboss.org] On
Behalf Of Edson Tirelli
Sent: Monday, April 12, 2010 12:10
PM
To: Rules Users List
Subject: Re: [rules-users]
drools-fusion: rule firing erroneously for thefirst time in a sliding window
Hi,
Yes, the current implementation for sliding time windows
does not wait for a window to move before starting to fire rules. As soon as
the constraint is met, it will activate the rule. The question is: should the
sliding window delay before firing? should it be configurable?
I need to do some research on that. Comments are welcome.
Edson
2010/4/12 Badrinath, Shyam <sbadrinath@sonusnet.com>
Hi
I am using
Drools 5.1.0 M1 within Eclipse. I am trying out a sample rule, which fires if
the minimum cpu is over 80 for 5s using sliding windows. I see that it works
well over a running window of 5s, but for the first time, it fires even before
reaching 5s.
It seems to
ignore the fact that 5s hasn’t elapsed yet. I am using the engine in
STREAM mode and using a pseudo clock to advance the time manually.
Is this the
expected behavior? Thanks!
sb
Here is the
rule:
package
org.drools.examples
import
org.drools.examples.CpuMetric.Cpu;
import
org.drools.examples.CpuMetric.Alarm;
global
org.apache.log4j.Logger logger
declare
Cpu
@role(event)
@expires(5s)
end
rule
"Above Cpu threshold of 80 for 5s"
dialect "java"
when
not Alarm()
$cpuMin : Number(intValue >= 80) from accumulate(
$cpu : Cpu($v : value) over window:time(5s), min($v)
)
then
logger.info("Cpu above 80
for 5 s, raising alarm. min cpu: "+$cpuMin);
Alarm a = new Alarm();
a.setReason("raised alarm as we hit cpu threshold");
a.setTime(System.currentTimeMillis());
insert(a);
end
Here is the
snippet of the class that declares the Cpu and Alarm class as well inserts
events into the rule engine. The other thing I noticed is the accumulate
function seems to ignore the regex in the <src pattern>. For example, in
the rule above, if I give
$cpuMin
: Number(intValue >= 80) from accumulate(
$cpu
: Cpu($v : value, srcIp ==’<val1>’ && destIp ==
‘<val2>’) over window:time(5s), min($v)
And insert
cpu events, with no matching srcIp and destIp, I shouldn’t see any alarm
raised, but I do and right at the beginning.
. . ... (code before this..)
//to use sliding windows, have to run the engine in stream mode
//default is cloud mode..where there are no concept of time and
//event ordering
KnowledgeBaseConfiguration kbaseconfig =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
kbaseconfig.setOption(EventProcessingOption.STREAM);
// add the packages to a knowledgebase (deploy the knowledge packages).
KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);
kbase.addKnowledgePackages(pkgs);
KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
conf.setOption(ClockTypeOption.get("pseudo"));
StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession(conf,
null);
//get clock to manually advance and test firing..
SessionPseudoClock clock = ksession.getSessionClock();
logger.info("Pseudo clock
current time: "+clock.getCurrentTime());
ksession.setGlobal("logger", logger);
ksession.addEventListener(new DebugAgendaEventListener());
ksession.addEventListener(new DebugWorkingMemoryEventListener());
// setup the audit logging
KnowledgeRuntimeLogger krlogger = KnowledgeRuntimeLoggerFactory
.newFileLogger(ksession, "log/cpu");
BufferedReader bf = new BufferedReader(new FileReader("/opt/cpumetricdata.txt"));
String s;
long time=0;
int count=0;
long lastime=0;
int delta=0;
//logger.info("Advancing
5s right at the start");
//clock.advanceTime(5000, TimeUnit.MILLISECONDS);
while((s = bf.readLine()) != null)
{
String[] vals = s.split(",");
Cpu cpumetric = new Cpu();
cpumetric.setValue(Integer.parseInt(vals[1]));
//set in ms
//for the first time, initialize time and lastime
//to the value read in from the first line.
time = Long.parseLong(vals[0]);
if(count ==0)
{
lastime = time;
logger.info("Initialized
lastime to "+lastime);
}
cpumetric.setTime(time);
cpumetric.setSrcIp("10.155.21.86");
cpumetric.setDestIp("10.6.35.120");
logger.info("Inserted
cpu metric "+cpumetric);
logger.info("Count:
"+count+" Pseudo clock current time: "+clock.getCurrentTime());
ksession.insert(cpumetric);
ksession.fireAllRules();
//advance based on the read in time in ms
//do it only from the second insert onwards
delta=(int) (time-lastime);
if(count >=1)
{
clock.advanceTime(delta, TimeUnit.MILLISECONDS);
logger.info("Pseudo clock
advanced "+delta+ "ms");
}
count++;
lastime=time;
}
System.out.println("Inserted facts, current time is "+new Date());
krlogger.close();
ksession.dispose();
bf.close();
}
public static class Alarm
{
private String reason;
private long time;
private String type;
/**
* @return the reason
*/
public String getReason()
{
return reason;
}
/**
* @param reason the reason to set
*/
public void setReason(String reason)
{
this.reason = reason;
}
/**
* @return the time
*/
public long getTime()
{
return time;
}
/**
* @return the type
*/
public String getType()
{
return type;
}
/**
* @param type the type to set
*/
public void setType(String type)
{
this.type = type;
}
/**
* @param time the time to set
*/
public void setTime(long time)
{
this.time = time;
}
}
public static class Cpu implements Serializable
{
private long time;
private int value;
private String srcIp;
private String destIp;
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString()
{
return "Cpu [time=" + time + ", value="
+ value + "]";
}
/**
* @return the time
*/
public long getTime()
{
return time;
}
/**
* @param time the time to set
*/
public void setTime(long time)
{
this.time = time;
}
/**
* @return the srcIp
*/
public String getSrcIp()
{
return srcIp;
}
/**
* @param srcIp the srcIp to set
*/
public void setSrcIp(String srcIp)
{
this.srcIp = srcIp;
}
/**
* @return the destIp
*/
public String getDestIp()
{
return destIp;
}
/**
* @param destIp the destIp to set
*/
public void setDestIp(String destIp)
{
this.destIp = destIp;
}
/**
* @return the value
*/
public int getValue()
{
return value;
}
/**
* @param value the value to set
*/
public void setValue(int value)
{
this.value = value;
}
}
}
Data
that drives the insertion:
1358,81
2359,86
3360,88
4361,80
5362,84
6363,80
7364,83
8365,99
9366,97
10367,99
_______________________________________________
rules-users mailing list
rules-users@lists.jboss.org
https://lists.jboss.org/mailman/listinfo/rules-users
--
Edson Tirelli
JBoss Drools Core Development
JBoss by Red Hat @ www.jboss.com