Hi

I am using Drools 5.1.0 M1 within Eclipse. I am trying out a sample rule, which fires if the minimum cpu is over 80 for 5s using sliding windows. I see that it works well over a running window of 5s, but for the first time, it fires even before reaching 5s.

It seems to ignore the fact that 5s hasn’t elapsed yet. I am using the engine in STREAM mode and using a pseudo clock to advance the time manually.

 

Is this the expected behavior? Thanks!

sb

 

Here is the rule:

 

package org.drools.examples

 

import org.drools.examples.CpuMetric.Cpu;

import org.drools.examples.CpuMetric.Alarm;

 

global org.apache.log4j.Logger logger

 

declare Cpu

      @role(event)

      @expires(5s)

end

 

rule "Above Cpu threshold of 80 for 5s"

      dialect "java"

      when

            not Alarm()

            $cpuMin : Number(intValue >= 80) from accumulate(

            $cpu : Cpu($v : value) over window:time(5s), min($v)

            )

      then

            logger.info("Cpu above 80 for 5 s, raising alarm. min cpu: "+$cpuMin);

            Alarm a = new Alarm();

            a.setReason("raised alarm as we hit cpu threshold");

            a.setTime(System.currentTimeMillis());

            insert(a);       

end

 

 

 

Here is the snippet of the class that declares the Cpu and Alarm class as well inserts events into the rule engine. The other thing I noticed is the accumulate function seems to ignore the regex in the <src pattern>. For example, in the rule above, if I give

$cpuMin : Number(intValue >= 80) from accumulate(

$cpu : Cpu($v : value, srcIp ==’<val1>’ && destIp == ‘<val2>’) over window:time(5s), min($v)

And insert cpu events, with no matching srcIp and destIp, I shouldn’t see any alarm raised, but I do and right at the beginning.

 

     

      . . ... (code before this..)

 

      //to use sliding windows, have to run the engine in stream mode

      //default is cloud mode..where there are no concept of time and

      //event     ordering

      KnowledgeBaseConfiguration kbaseconfig = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();

      kbaseconfig.setOption(EventProcessingOption.STREAM);

     

      // add the packages to a knowledgebase (deploy the knowledge packages).

      KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase(kbaseconfig);

     

      kbase.addKnowledgePackages(pkgs);

 

      KnowledgeSessionConfiguration conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();

      conf.setOption(ClockTypeOption.get("pseudo"));

     

      StatefulKnowledgeSession ksession = kbase.newStatefulKnowledgeSession(conf, null);

     

      //get clock to manually advance and test firing..

      SessionPseudoClock clock = ksession.getSessionClock();

     

      logger.info("Pseudo clock current time: "+clock.getCurrentTime());

      ksession.setGlobal("logger", logger);

     

     

      ksession.addEventListener(new DebugAgendaEventListener());

      ksession.addEventListener(new DebugWorkingMemoryEventListener());

 

      // setup the audit logging

      KnowledgeRuntimeLogger krlogger = KnowledgeRuntimeLoggerFactory

            .newFileLogger(ksession, "log/cpu");

 

      BufferedReader bf = new BufferedReader(new FileReader("/opt/cpumetricdata.txt"));

      String s;

      long time=0;

      int count=0;

      long lastime=0;

      int delta=0;

     

      //logger.info("Advancing 5s right at the start");

      //clock.advanceTime(5000, TimeUnit.MILLISECONDS);

      while((s = bf.readLine()) != null)

      {

         

          String[] vals = s.split(",");

          Cpu cpumetric = new Cpu();

          cpumetric.setValue(Integer.parseInt(vals[1]));

          //set in ms

          //for the first time, initialize time and lastime

          //to the value read in from the first line.

          time = Long.parseLong(vals[0]);

          if(count ==0)

          {

            lastime = time;

            logger.info("Initialized lastime to "+lastime);

          }

         

          cpumetric.setTime(time);

          cpumetric.setSrcIp("10.155.21.86");

          cpumetric.setDestIp("10.6.35.120");

          logger.info("Inserted cpu metric "+cpumetric);

          logger.info("Count: "+count+" Pseudo clock current time: "+clock.getCurrentTime());

          ksession.insert(cpumetric);

         

          ksession.fireAllRules();

          //advance based on the read in time in ms

          //do it only from the second insert onwards

          delta=(int) (time-lastime);

          if(count >=1)

          {

            clock.advanceTime(delta, TimeUnit.MILLISECONDS);

            logger.info("Pseudo clock advanced "+delta+ "ms");

          }

          count++;

          lastime=time;

      }

     

      System.out.println("Inserted facts, current time is "+new Date());

      krlogger.close();

      ksession.dispose();

      bf.close();

     

    }

 

   

    public static class Alarm

    {

      private String reason;

      private long time;

      private String type;

      /**

       * @return the reason

       */

      public String getReason()

      {

          return reason;

      }

      /**

       * @param reason the reason to set

       */

      public void setReason(String reason)

      {

          this.reason = reason;

      }

      /**

       * @return the time

       */

      public long getTime()

      {

          return time;

      }

      /**

       * @return the type

       */

      public String getType()

      {

          return type;

      }

      /**

       * @param type the type to set

       */

      public void setType(String type)

      {

          this.type = type;

      }

      /**

       * @param time the time to set

       */

      public void setTime(long time)

      {

          this.time = time;

      }

     

     

       

    }

    public static class Cpu implements Serializable

    {

 

      private long time;

      private int value;

      private String srcIp;

      private String destIp;

     

      /* (non-Javadoc)

       * @see java.lang.Object#toString()

       */

      @Override

      public String toString()

      {

          return "Cpu [time=" + time + ", value=" + value + "]";

      }

      /**

       * @return the time

       */

      public long getTime()

      {

          return time;

      }

      /**

       * @param time the time to set

       */

      public void setTime(long time)

      {

          this.time = time;

      }

      /**

       * @return the srcIp

       */

      public String getSrcIp()

      {

          return srcIp;

      }

      /**

       * @param srcIp the srcIp to set

       */

      public void setSrcIp(String srcIp)

      {

          this.srcIp = srcIp;

      }

      /**

       * @return the destIp

       */

      public String getDestIp()

      {

          return destIp;

      }

      /**

       * @param destIp the destIp to set

       */

      public void setDestIp(String destIp)

      {

          this.destIp = destIp;

      }

      /**

       * @return the value

       */

      public int getValue()

      {

          return value;

      }

      /**

       * @param value the value to set

       */

      public void setValue(int value)

      {

          this.value = value;

      }

     

    }

}

 

Data that drives the insertion:

1358,81

2359,86

3360,88

4361,80

5362,84

6363,80

7364,83

8365,99

9366,97

10367,99