[rules-users] drools-fusion: rule firing erroneously for thefirst time in a sliding window

Badrinath, Shyam sbadrinath at sonusnet.com
Mon Apr 12 13:26:24 EDT 2010


Hi Edson

Thanks for the clarification. The sliding window implementation does
wait for 5s, later on during a test, before activating the rule. So,
theoretically, there shouldn't be any difference in behavior, even at
the beginning, whether the clock is in pseudo or real mode.

Lets assume t(0) is the start, then nothing should fire atleast until
t(0+5) and it should be the same for the window between t(n) and t(n+5).

Also, I noticed that the accumulate function seems to ignore the regex
in the <src pattern>. For example, given the rule below, if I insert Cpu
events with no matching srcIp and destIp, then I would expect no rule to
fire, since the window would look at only the Cpu objects in working
memory that match the criteria below. Am I missing something? 

$cpuMin : Number(intValue >= 80) from accumulate(

$cpu : Cpu($v : value, srcIp =='<val1>' && destIp == '<val2>') over
window:time(5s), min($v)

 

 

________________________________

From: rules-users-bounces at lists.jboss.org
[mailto:rules-users-bounces at lists.jboss.org] On Behalf Of Edson Tirelli
Sent: Monday, April 12, 2010 12:10 PM
To: Rules Users List
Subject: Re: [rules-users] drools-fusion: rule firing erroneously for
thefirst time in a sliding window

 


    Hi,

    Yes, the current implementation for sliding time windows does not
wait for a window to move before starting to fire rules. As soon as the
constraint is met, it will activate the rule. The question is: should
the sliding window delay before firing? should it be configurable? 

   I need to do some research on that. Comments are welcome.

   Edson

2010/4/12 Badrinath, Shyam <sbadrinath at sonusnet.com>

Hi

I am using Drools 5.1.0 M1 within Eclipse. I am trying out a sample
rule, which fires if the minimum cpu is over 80 for 5s using sliding
windows. I see that it works well over a running window of 5s, but for
the first time, it fires even before reaching 5s.

It seems to ignore the fact that 5s hasn't elapsed yet. I am using the
engine in STREAM mode and using a pseudo clock to advance the time
manually.

 

Is this the expected behavior? Thanks!

sb

 

Here is the rule:

 

package org.drools.examples

 

import org.drools.examples.CpuMetric.Cpu;

import org.drools.examples.CpuMetric.Alarm;

 

global org.apache.log4j.Logger logger

 

declare Cpu

      @role(event)

      @expires(5s)

end

 

rule "Above Cpu threshold of 80 for 5s"

      dialect "java"

      when

            not Alarm()

            $cpuMin : Number(intValue >= 80) from accumulate(

            $cpu : Cpu($v : value) over window:time(5s), min($v)

            )

      then

            logger.info("Cpu above 80 for 5 s, raising alarm. min cpu:
"+$cpuMin);

            Alarm a = new Alarm();

            a.setReason("raised alarm as we hit cpu threshold");

            a.setTime(System.currentTimeMillis());

            insert(a);        

end

 

 

 

Here is the snippet of the class that declares the Cpu and Alarm class
as well inserts events into the rule engine. The other thing I noticed
is the accumulate function seems to ignore the regex in the <src
pattern>. For example, in the rule above, if I give 

$cpuMin : Number(intValue >= 80) from accumulate(

$cpu : Cpu($v : value, srcIp =='<val1>' && destIp == '<val2>') over
window:time(5s), min($v)

And insert cpu events, with no matching srcIp and destIp, I shouldn't
see any alarm raised, but I do and right at the beginning.

 

      

      . . ... (code before this..)

 

      //to use sliding windows, have to run the engine in stream mode

      //default is cloud mode..where there are no concept of time and

      //event     ordering

      KnowledgeBaseConfiguration kbaseconfig =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();

      kbaseconfig.setOption(EventProcessingOption.STREAM);

      

      // add the packages to a knowledgebase (deploy the knowledge
packages).

      KnowledgeBase kbase =
KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);

      

      kbase.addKnowledgePackages(pkgs);

 

      KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();

      conf.setOption(ClockTypeOption.get("pseudo"));

      

      StatefulKnowledgeSession ksession =
kbase.newStatefulKnowledgeSession(conf, null);

      

      //get clock to manually advance and test firing..

      SessionPseudoClock clock = ksession.getSessionClock();

      

      logger.info("Pseudo clock current time: "+clock.getCurrentTime());

      ksession.setGlobal("logger", logger);

      

      

      ksession.addEventListener(new DebugAgendaEventListener());

      ksession.addEventListener(new DebugWorkingMemoryEventListener());

 

      // setup the audit logging

      KnowledgeRuntimeLogger krlogger = KnowledgeRuntimeLoggerFactory

            .newFileLogger(ksession, "log/cpu");

 

      BufferedReader bf = new BufferedReader(new
FileReader("/opt/cpumetricdata.txt"));

      String s;

      long time=0;

      int count=0;

      long lastime=0;

      int delta=0;

      

      //logger.info("Advancing 5s right at the start");

      //clock.advanceTime(5000, TimeUnit.MILLISECONDS);

      while((s = bf.readLine()) != null)

      {

          

          String[] vals = s.split(",");

          Cpu cpumetric = new Cpu();

          cpumetric.setValue(Integer.parseInt(vals[1]));

          //set in ms

          //for the first time, initialize time and lastime

          //to the value read in from the first line.

          time = Long.parseLong(vals[0]);

          if(count ==0)

          {

            lastime = time;

            logger.info("Initialized lastime to "+lastime);

          }

          

          cpumetric.setTime(time);

          cpumetric.setSrcIp("10.155.21.86");

          cpumetric.setDestIp("10.6.35.120");

          logger.info("Inserted cpu metric "+cpumetric);

          logger.info("Count: "+count+" Pseudo clock current time:
"+clock.getCurrentTime());

          ksession.insert(cpumetric);

          

          ksession.fireAllRules();

          //advance based on the read in time in ms

          //do it only from the second insert onwards

          delta=(int) (time-lastime);

          if(count >=1)

          {

            clock.advanceTime(delta, TimeUnit.MILLISECONDS);

            logger.info("Pseudo clock advanced "+delta+ "ms");

          }

          count++;

          lastime=time;

      }

      

      System.out.println("Inserted facts, current time is "+new Date());

      krlogger.close();

      ksession.dispose();

      bf.close();

      

    }

 

    

    public static class Alarm

    {

      private String reason;

      private long time;

      private String type;

      /**

       * @return the reason

       */

      public String getReason()

      {

          return reason;

      }

      /**

       * @param reason the reason to set

       */

      public void setReason(String reason)

      {

          this.reason = reason;

      }

      /**

       * @return the time

       */

      public long getTime()

      {

          return time;

      }

      /**

       * @return the type

       */

      public String getType()

      {

          return type;

      }

      /**

       * @param type the type to set

       */

      public void setType(String type)

      {

          this.type = type;

      }

      /**

       * @param time the time to set

       */

      public void setTime(long time)

      {

          this.time = time;

      }

      

      

       

    }

    public static class Cpu implements Serializable

    {

 

      private long time;

      private int value;

      private String srcIp;

      private String destIp;

      

      /* (non-Javadoc)

       * @see java.lang.Object#toString()

       */

      @Override

      public String toString()

      {

          return "Cpu [time=" + time + ", value=" + value + "]";

      }

      /**

       * @return the time

       */

      public long getTime()

      {

          return time;

      }

      /**

       * @param time the time to set

       */

      public void setTime(long time)

      {

          this.time = time;

      }

      /**

       * @return the srcIp

       */

      public String getSrcIp()

      {

          return srcIp;

      }

      /**

       * @param srcIp the srcIp to set

       */

      public void setSrcIp(String srcIp)

      {

          this.srcIp = srcIp;

      }

      /**

       * @return the destIp

       */

      public String getDestIp()

      {

          return destIp;

      }

      /**

       * @param destIp the destIp to set

       */

      public void setDestIp(String destIp)

      {

          this.destIp = destIp;

      }

      /**

       * @return the value

       */

      public int getValue()

      {

          return value;

      }

      /**

       * @param value the value to set

       */

      public void setValue(int value)

      {

          this.value = value;

      }

      

    }

}

 

Data that drives the insertion:

1358,81

2359,86

3360,88

4361,80

5362,84

6363,80

7364,83

8365,99

9366,97

10367,99

 


_______________________________________________
rules-users mailing list
rules-users at lists.jboss.org
https://lists.jboss.org/mailman/listinfo/rules-users




-- 
 Edson Tirelli
 JBoss Drools Core Development
 JBoss by Red Hat @ www.jboss.com

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/rules-users/attachments/20100412/6ef0e8ac/attachment.html 


More information about the rules-users mailing list