On first reading, both observations could be bugs. But it is difficult to
the effect.
On 05/06/2014, Sushant Goyal <
sushantgoyal25@yahoo.co.in> wrote:
> Thanks again for your response Wolfgang.
>
> I found one of the explanations on how
Sliding Windows
> operate in Drools while searching for an answer:
> "Events are expired
> from the time window when the clock advances"
>
> So, in my case if I am inserting events on T0 (insert 1st event), T1 (insert
> 2nd event), T2 (insert 3rd event) and
> advancing the clock to T3, T4 and T5; 1st event inserted at T0 is
> expired from the window. And the rule calculates the average of the last two
> events (2nd event & 3rd event) falling in the time
> window. And the output average calculated by the from accumulate function
> comes
> to be (26 +28 / 2 == 27) satisfying the contract of average temperature to
> be above
> the threshold value of 25.
>
> Now let's say I do the below (advancing the clock by 5
> minutes after the 3rd insert):
>
> T0 (insert 1st event)
> T1 (insert 2st event)
> T2 (insert 3st event)
> T3
> T4
> T5
> T6
> T7
> Fire all rules
>
> The average calculated by the rule is zero (0) since the
> three inserted events fall outside the time window of 5 minutes (as
> expected).
> But, my rule fires regardless of the fact I have the contract of average
> temperature
> to be greater than threshold in my rule :
>
> $averageTemp : Number(intValue
>> 25)
> from
> accumulate(SensorReading($temp : temperature)
> over
> window:time(5m) from
> entry-point "Temperature Reading", average($temp))
>
> As per the drools-fusion documentation, over window : time(X) is used to
> consider events that happened in
> the last X units. So, is there something that needs to be changed in my rule
> implementation?
>
> One more question here, does over window : time(X) really considers events
> occurred in the last
> X units. I tried going further by doing the below:
>
> T0 (insert 1st event - Sensor reading is 24)
> T1 (insert 2st event - Sensor reading is 26)
> T2 (insert 3st event - Sensor reading is 28)
> T3
> T4
> T5
> T6
> T7
> (insert event - Sensor reading is 30)
> Fire All rules
>
> With the above test setup, my rule gets invoked twice printing
> the average of
temperatures as (30) and (30). But using over window:time(5m)
> shouldn't result in rule being triggered once ?
>
> Thanks in advance!
>
>
>
> On Thursday, 5 June 2014 12:31 PM, Wolfgang Laun <
wolfgang.laun@gmail.com>
> wrote:
>
>
>
> Computations involving intervals where events arrive in real time
> should not depend on differences of one unit of the smalles
unit of
> time for specifying durations (here: milliseconds).
>
> -W
>
>
>
>
> On 04/06/2014, Sushant Goyal <
sushantgoyal25@yahoo.co.in> wrote:
>> Thanks for your response.
>> I was under the assumption that the window time frame begins, the moment
>> first event is inserted into the working memory. Or I am still missing out
>> something here (when you say that the first event is
exactly at the
>> beginning of the window)? If I traverse backwards as below, then
>> definitely
>> the 1st event lies outside the window.
>>
>> insert 1st event
>> delay 1 minute ^ (4 + 1 = 5)
>> insert 2nd event
>> delay 1 minute ^ (3 + 1 = 4)
>> insert 3rd event
>> delay 1 minute ^ (2 + 1 = 3)
>> delay 2 minutes ^
>> fire all rules
>>
>> However, if I start from the 1st event, the possibility where 1st event
>> lies
>> out of time frame of 5 mins would be if certain amount of time is
>> consumed while inserting the 1st event. I tried to check it
>> programmatically
>> to see if there is any time lapsed during the insertion of the event by
>> printing the clock's current time using clock.getCurrentTime() method
>> before
>> and after inserting the 1st event, however, could not notice any
>> difference.
>>
>> But, if I change the
time to be advanced by 1 minutes and 59 seconds
>> (instead of 2 minutes) towards the end after the three events have been
>> inserted, the rule seems to be working fine by considering all the three
>> events for calculating the average.
>>
>> I know I am definitely missing out something here as I can see the
>> difference in the behavior of the rule by merely reducing the total delay
>> in
>> time by 1 second (4 min 59 sec instead of 5 minutes) as stated above. But
>> I
>> am not able to figure that out.
>>
>> Could you please provide more insight.
>>
>> Thanks!
>>
>>
>> On Wednesday, 4 June 2014 9:01 PM, Wolfgang Laun <
wolfgang.laun@gmail.com>
>> wrote:
>>
>>
>>
>> Your code does:
>> insert 1st event
>> delay 1 minute
>> insert 2nd event
>> delay 1 minute
>> insert 3rd event
>> delay 1 minute
>> delay (1 or) 2 minutes
>> fire all rules
>>
>> In the second case, 5 minutes have elapsed since the 1st insert. I
>> don't think that this means "that [the 1st insert] falls well within
>> the specified time range" - it is exactly at the beginning of the
>> window, and I'd expect the window to be an interval open at one end -
>> otherwise events smack on the point in time "separating" two intervals
>> would be in
both windows.
>>
>> -W
>>
>>
>> On 04/06/2014, Sushant Goyal <
sushantgoyal25@yahoo.co.in> wrote:
>>> Hi,
>>>
>>> I am
>>> trying to understand how Drools can be used to monitor events over a
>>> period
>>> of
>>> time using Sliding Time Windows. I have created a rule to sound an alarm
>>>
when
>>> average temperature read from a sensor is above 25 degrees (threshold)
>>> over
>>> a
>>> time period of 5 minutes. The rule makes of use of the Stream processing
>>> mode so
>>> that continuous stream of events could be processed.
>>>
>>> Below
>>> is how my rule looks like:
>>>
>>> //declare any global
>>> variables here
>>> globaljava.lang.StringBuilder alertMessage
>>>
>>> // add declaration to
>>> change the Fact into an Event
>>> declareSensorReading
>>> @role(event)
>>> end
>>>
>>> /* Alert when average
>>> temperature is above 25
>>> over a time period of 5 minutes */
>>> rule"TemperatureAlarm1"
>>>
>>> when
>>> //conditions
>>> $averageTemp : Number(doubleValue >
>>> 25.00)
>>> fromaccumulate(SensorReading($temp
>>> : temperature)
>>> over
>>> window:time(5m) fromentry-point "Temperature Reading", average($temp))
>>> then
>>> //actions
>>> System.out.println("Fired rule:
>>> "+
>>> kcontext.getRule().getName());
>>> alertMessage.append("Threshold
>>> temperature breached!!"+
>>> "\nTurn on the
>>> Air Conditioner"+
>>> "\nAverage
>>> temperature over 5 minutes is above 25 ("+ $averageTemp.intValue() +
>>> ")\n");
>>>
>>> end
>>>
>>> And below
>>> is the snapshot of the fact (SensorReading) which is inserted as an event
>>> in
>>> the working memory:
>>>
>>> publicclassSensorReading {
>>>
>>> privatedoubletemperature;
>>>
>>> publicSensorReading(){}
>>>
>>> publicSensorReading(doubletemp){
>>>
this.temperature= temp;
>>> }
>>>
>>> // getters and setters
>>> }
>>>
>>>
>>> In
>>> order to test the rule, I am using Pseudo Clock with Stream processing
>>> turned
>>> on. I am inserting three SensorReading objects in the working memory with
>>> temperature
>>> values as (24, 26, 28) after every minute, so that the average of the
>>>
temperatures is above threshold and the rule is invoked. After the
>>> objects
>>> are
>>> inserted in the working memory, I am deliberately advancing the Pseudo
>>> clock
>>> by
>>> another 1 minute, so that the total time elapsed is 4 minutes. The rule
>>> works
>>> as expected with the above test setup and prints the average value as 26
>>> on
>>> the
>>> console.
>>>
>>> However,
>>> if I advance the clock by 2 minutes instead of 1 minute after three
>>> sensor
>>> reading objects have been inserted in the working memory (after every 1
>>> minute
>>> interval), the rule gets invoked but the average value gets changed to 27
>>> (26 +
>>> 28 / 2 == 27). Looks like the first temperature reading is getting
>>> ignored
>>> by
>>> the rule despite the fact that it falls well within the specified time
>>> range
>>> of
>>> 5 minutes. Below is the snapshot of my test class:
>>>
>>> publicclassTemperatureAlarmTest
>>> {
>>>
>>> staticKnowledgeBase kbase;
>>> staticStatefulKnowledgeSession ksession;
>>> staticKnowledgeRuntimeLogger logger;
>>> staticSessionPseudoClock clock;
>>>
>>> @BeforeClass
>>> publicstaticvoidsetupKsession()
{
>>> try{
>>> // load up the
>>> knowledge base
>>> kbase= readKnowledgeBase();
>>> ksession= readKnowldedeSession(kbase);
>>> clock= ksession.getSessionClock();
>>>
>>> logger=
>>> KnowledgeRuntimeLoggerFactory.newThreadedFileLogger(ksession,
>>> "log/Errors",
>>> 500);
>>>
>>> } catch(Throwable t) {
>>> t.printStackTrace();
>>> }
>>> }
>>>
>>> /**
>>> *
>>> Create a new Stateful
knowledge Session with a pseudo clock from the
>>> *
>>> knowledge base
>>> *
>>> * @paramkbase
>>> * @return
>>> * @throwsException
>>> */
>>> privatestaticStatefulKnowledgeSession readKnowldedeSession(
>>> KnowledgeBase kbase) throwsException {
>>>
>>> // Knowledge Session Configuration
>>> KnowledgeSessionConfiguration
>>> config = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
>>> config.setOption(ClockTypeOption.get("pseudo"));
>>> returnkbase.newStatefulKnowledgeSession(config, null);
>>>
>>> }
>>>
>>> @AfterClass
>>>
publicstaticvoidcloseKsession() {
>>> try{
>>> // load up the
>>> knowledge base
>>> logger.close();
>>> ksession.dispose();
>>>
>>> } catch(Throwable t) {
>>> t.printStackTrace();
>>> }
>>> }
>>>
>>> @Test
>>> publicvoidTemperatureAlarm1_Test() {
>>>
>>> // Create Temperature list
>>> ArrayList<SensorReading>
>>> tempMetrics = newArrayList<SensorReading>();
>>> doubletemp = 24.00;
>>>
>>> while(tempMetrics.size()
>>> < 3) {
>>> tempMetrics.add(newSensorReading(temp));
>>> temp += 2;
>>> }
>>> System.out.println("Size of
>>> tempMetrics List: "+tempMetrics.size()+"\n");
>>> System.out.println("First Temp
>>> reading:
"+tempMetrics.get(0).getTemperature());
>>> System.out.println("Second Temp
>>> reading: "+tempMetrics.get(1).getTemperature());
>>> System.out.println("Third Temp
>>> reading: "+tempMetrics.get(2).getTemperature()+"\n");
>>>
>>> // Separate stream for inserts
>>> WorkingMemoryEntryPoint
>>> temperatureStream = ksession.getWorkingMemoryEntryPoint( "Temperature
>>> Reading");
>>>
>>> // Create fact handle list
>>> ArrayList<FactHandle>
>>> factHandleList = newArrayList<FactHandle>();
>>>
>>> // Insert objects into working
>>> memory while advancing the clock
>>> for(inti = 0; i <
>>> tempMetrics.size(); i++) {
>>>
>>>
factHandleList.add(temperatureStream.insert(tempMetrics.get(i)));
>>> clock.advanceTime(1,
>>> TimeUnit.MINUTES);
>>> System.out.println("Time advances
>>> by 1 minute");
>>> }
>>> System.out.println("Fact Count
>>> is: "+temperatureStream.getFactCount());
>>> System.out.println("Fact Entry
>>> Point is: "+temperatureStream.getEntryPointId());
>>> System.out.println("Size of
>>> FactHandleList: "+factHandleList.size()+"\n");
>>>
>>> clock.advanceTime(1,
>>> TimeUnit.MINUTES); //change in
>>> advanced time alters the rule behavior
>>>
>>> StringBuilder stringBuilder = newStringBuilder();
>>>
ksession.setGlobal("alertMessage", stringBuilder);
>>> ksession.fireAllRules();
>>>
>>> // Remove facts
>>> for(inti = 0; i <
>>> factHandleList.size(); i++) {
>>> temperatureStream.retract(factHandleList.get(i));
>>> }
>>> System.out.println("After
>>>
Removing facts");
>>> System.out.println("Fact Count
>>> is: "+temperatureStream.getFactCount());
>>>
>>> String result =
>>> stringBuilder.substring(0, 32);
>>> System.out.println("Alert Message
>>> is: \n"+ stringBuilder.toString());
>>> assertEquals("Alert Message
>>> is: ", "Threshold
>>> temperature breached!!", result);
>>> }
>>>
>>> /**
>>> *
>>> Create the knowledge base with stream processing turned on.
>>> *
>>> * @return
>>> * @throwsException
>>> */
>>> privatestaticKnowledgeBase
>>> readKnowledgeBase() throwsException {
>>>
KnowledgeBuilder kbuilder =
>>> KnowledgeBuilderFactory.newKnowledgeBuilder();
>>>
>>> kbuilder.add(ResourceFactory.newClassPathResource("TemperatureAlarm1.drl"),ResourceType.DRL);
>>> hasErrors(kbuilder);
>>>
>>> // Stream processing turned on
>>> KnowledgeBaseConfiguration conf =
>>> KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
>>>
conf.setOption(EventProcessingOption.STREAM);
>>> KnowledgeBase kbase =
>>> KnowledgeBaseFactory.newKnowledgeBase(conf);
>>> hasErrors(kbuilder);
>>>
>>> kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
>>>
>>> returnkbase;
>>> }
>>>
>>> /**
>>> *
>>> Report errors if any
>>> *
>>> * @paramkbuilder
>>> * @throwsException
>>> */
>>> privatestaticvoidhasErrors(KnowledgeBuilder kbuilder)
>>> throwsException
>>> {
>>> KnowledgeBuilderErrors errors =
>>> kbuilder.getErrors();
>>> if(errors.size() > 0)
{
>>> for(KnowledgeBuilderError error : errors) {
>>> System.err.println(error);
>>> }
>>> thrownewIllegalArgumentException("Could not parse
>>> knowledge.");
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> Could
>>> anyone please help explain this change in the behavior of the rule?
>>>
>>> Regards,
>>> Sushant