[rules-users] drools-fusion: rule firing erroneously for the first time in a sliding window
Edson Tirelli
ed.tirelli at gmail.com
Mon Apr 12 12:09:45 EDT 2010
Hi,
Yes, the current implementation for sliding time windows does not wait
for a window to move before starting to fire rules. As soon as the
constraint is met, it will activate the rule. The question is: should the
sliding window delay before firing? should it be configurable?
I need to do some research on that. Comments are welcome.
Edson
2010/4/12 Badrinath, Shyam <sbadrinath at sonusnet.com>
> Hi
>
> I am using Drools 5.1.0 M1 within Eclipse. I am trying out a sample rule,
> which fires if the minimum cpu is over 80 for 5s using sliding windows. I
> see that it works well over a running window of 5s, but for the first time,
> it fires even before reaching 5s.
>
> It seems to ignore the fact that 5s hasn’t elapsed yet. I am using the
> engine in STREAM mode and using a pseudo clock to advance the time manually.
>
>
>
> Is this the expected behavior? Thanks!
>
> sb
>
>
>
> Here is the rule:
>
>
>
> package org.drools.examples
>
>
>
> import org.drools.examples.CpuMetric.Cpu;
>
> import org.drools.examples.CpuMetric.Alarm;
>
>
>
> global org.apache.log4j.Logger logger
>
>
>
> declare Cpu
>
> @role(event)
>
> @expires(5s)
>
> end
>
>
>
> rule "Above Cpu threshold of 80 for 5s"
>
> dialect "java"
>
> when
>
> not Alarm()
>
> $cpuMin : Number(intValue >= 80) from accumulate(
>
> $cpu : Cpu($v : value) over window:time(5s), min($v)
>
> )
>
> then
>
> logger.info("Cpu above 80 for 5 s, raising alarm. min cpu:
> "+$cpuMin);
>
> Alarm a = new Alarm();
>
> a.setReason("raised alarm as we hit cpu threshold");
>
> a.setTime(System.currentTimeMillis());
>
> insert(a);
>
> end
>
>
>
>
>
>
>
> Here is the snippet of the class that declares the Cpu and Alarm class as
> well inserts events into the rule engine. The other thing I noticed is the
> accumulate function seems to ignore the regex in the <src pattern>. For
> example, in the rule above, if I give
>
> $cpuMin : Number(intValue >= 80) from accumulate(
>
> $cpu : Cpu($v : value, srcIp ==’<val1>’ && destIp == ‘<val2>’) over
> window:time(5s), min($v)
>
> And insert cpu events, with no matching srcIp and destIp, I shouldn’t see
> any alarm raised, but I do and right at the beginning.
>
>
>
>
>
> . . ... (code before this..)
>
>
>
> //to use sliding windows, have to run the engine in stream mode
>
> //default is cloud mode..where there are no concept of time and
>
> //event ordering
>
> KnowledgeBaseConfiguration kbaseconfig =
> KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
>
> kbaseconfig.setOption(EventProcessingOption.STREAM);
>
>
>
> // add the packages to a knowledgebase (deploy the knowledge
> packages).
>
> KnowledgeBase kbase =
> KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);
>
>
>
> kbase.addKnowledgePackages(pkgs);
>
>
>
> KnowledgeSessionConfiguration conf =
> KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
>
> conf.setOption(ClockTypeOption.get("pseudo"));
>
>
>
> StatefulKnowledgeSession ksession =
> kbase.newStatefulKnowledgeSession(conf, null);
>
>
>
> //get clock to manually advance and test firing..
>
> SessionPseudoClock clock = ksession.getSessionClock();
>
>
>
> logger.info("Pseudo clock current time: "+clock.getCurrentTime());
>
> ksession.setGlobal("logger", logger);
>
>
>
>
>
> ksession.addEventListener(new DebugAgendaEventListener());
>
> ksession.addEventListener(new DebugWorkingMemoryEventListener());
>
>
>
> // setup the audit logging
>
> KnowledgeRuntimeLogger krlogger = KnowledgeRuntimeLoggerFactory
>
> .newFileLogger(ksession, "log/cpu");
>
>
>
> BufferedReader bf = new BufferedReader(new
> FileReader("/opt/cpumetricdata.txt"));
>
> String s;
>
> long time=0;
>
> int count=0;
>
> long lastime=0;
>
> int delta=0;
>
>
>
> //logger.info("Advancing 5s right at the start");
>
> //clock.advanceTime(5000, TimeUnit.MILLISECONDS);
>
> while((s = bf.readLine()) != null)
>
> {
>
>
>
> String[] vals = s.split(",");
>
> Cpu cpumetric = new Cpu();
>
> cpumetric.setValue(Integer.parseInt(vals[1]));
>
> //set in ms
>
> //for the first time, initialize time and lastime
>
> //to the value read in from the first line.
>
> time = Long.parseLong(vals[0]);
>
> if(count ==0)
>
> {
>
> lastime = time;
>
> logger.info("Initialized lastime to "+lastime);
>
> }
>
>
>
> cpumetric.setTime(time);
>
> cpumetric.setSrcIp("10.155.21.86");
>
> cpumetric.setDestIp("10.6.35.120");
>
> logger.info("Inserted cpu metric "+cpumetric);
>
> logger.info("Count: "+count+" Pseudo clock current time:
> "+clock.getCurrentTime());
>
> ksession.insert(cpumetric);
>
>
>
> ksession.fireAllRules();
>
> //advance based on the read in time in ms
>
> //do it only from the second insert onwards
>
> delta=(int) (time-lastime);
>
> if(count >=1)
>
> {
>
> clock.advanceTime(delta, TimeUnit.MILLISECONDS);
>
> logger.info("Pseudo clock advanced "+delta+ "ms");
>
> }
>
> count++;
>
> lastime=time;
>
> }
>
>
>
> System.out.println("Inserted facts, current time is "+new Date());
>
> krlogger.close();
>
> ksession.dispose();
>
> bf.close();
>
>
>
> }
>
>
>
>
>
> public static class Alarm
>
> {
>
> private String reason;
>
> private long time;
>
> private String type;
>
> /**
>
> * @return the reason
>
> */
>
> public String getReason()
>
> {
>
> return reason;
>
> }
>
> /**
>
> * @param reason the reason to set
>
> */
>
> public void setReason(String reason)
>
> {
>
> this.reason = reason;
>
> }
>
> /**
>
> * @return the time
>
> */
>
> public long getTime()
>
> {
>
> return time;
>
> }
>
> /**
>
> * @return the type
>
> */
>
> public String getType()
>
> {
>
> return type;
>
> }
>
> /**
>
> * @param type the type to set
>
> */
>
> public void setType(String type)
>
> {
>
> this.type = type;
>
> }
>
> /**
>
> * @param time the time to set
>
> */
>
> public void setTime(long time)
>
> {
>
> this.time = time;
>
> }
>
>
>
>
>
>
>
> }
>
> public static class Cpu implements Serializable
>
> {
>
>
>
> private long time;
>
> private int value;
>
> private String srcIp;
>
> private String destIp;
>
>
>
> /* (non-Javadoc)
>
> * @see java.lang.Object#toString()
>
> */
>
> @Override
>
> public String toString()
>
> {
>
> return "Cpu [time=" + time + ", value=" + value + "]";
>
> }
>
> /**
>
> * @return the time
>
> */
>
> public long getTime()
>
> {
>
> return time;
>
> }
>
> /**
>
> * @param time the time to set
>
> */
>
> public void setTime(long time)
>
> {
>
> this.time = time;
>
> }
>
> /**
>
> * @return the srcIp
>
> */
>
> public String getSrcIp()
>
> {
>
> return srcIp;
>
> }
>
> /**
>
> * @param srcIp the srcIp to set
>
> */
>
> public void setSrcIp(String srcIp)
>
> {
>
> this.srcIp = srcIp;
>
> }
>
> /**
>
> * @return the destIp
>
> */
>
> public String getDestIp()
>
> {
>
> return destIp;
>
> }
>
> /**
>
> * @param destIp the destIp to set
>
> */
>
> public void setDestIp(String destIp)
>
> {
>
> this.destIp = destIp;
>
> }
>
> /**
>
> * @return the value
>
> */
>
> public int getValue()
>
> {
>
> return value;
>
> }
>
> /**
>
> * @param value the value to set
>
> */
>
> public void setValue(int value)
>
> {
>
> this.value = value;
>
> }
>
>
>
> }
>
> }
>
>
>
> Data that drives the insertion:
>
> 1358,81
>
> 2359,86
>
> 3360,88
>
> 4361,80
>
> 5362,84
>
> 6363,80
>
> 7364,83
>
> 8365,99
>
> 9366,97
>
> 10367,99
>
>
>
> _______________________________________________
> rules-users mailing list
> rules-users at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/rules-users
>
>
--
Edson Tirelli
JBoss Drools Core Development
JBoss by Red Hat @ www.jboss.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/rules-users/attachments/20100412/a225cf0e/attachment.html
More information about the rules-users
mailing list