<html><head></head><body><div style="font-family: Verdana;font-size: 12.0px;"><div>
<div>Hello everyone,</div>

<div>&nbsp;</div>

<div>i&#39;m having some design problems with Drools Fusion and CEP. Maybe you can guide me into the right direction.<br/>
(Drools 5.6)</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>Problem:<br/>
I want to use Fusion CEP to handle a quite large amount of events (25-50 thousand per minute as a start). The events don&#39;t have to live very long so I plan to expire them very soon.<br/>
My rule is pretty simple:<br/>
There is only one event type (which has a string value field in it).<br/>
The rule should fire when there are X events with the same value Y in time window Z.<br/>
The effect of the rule should be something like an Alarm which will be extracted from outside the rule engine.</div>

<div>&nbsp;</div>

<div><br/>
I found some similar examples (and also studies the documentation) but the race condition (see down below) destroys all my plans.</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>ATM it looks like this:<br/>
declare MyEvent<br/>
&nbsp;&nbsp;&nbsp; @role(event)<br/>
&nbsp;&nbsp;&nbsp; @expires( 30s )<br/>
end</div>

<div>&nbsp;</div>

<div>declare Alarm<br/>
&nbsp;&nbsp;&nbsp; @role(event)<br/>
end</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>rule &quot;RULE 1&quot;<br/>
when<br/>
&nbsp; &#36;e : MyEvent( ) from entry-point entryone<br/>
//&nbsp; not Alarm(eventValue == &#36;e.getEventValue())<br/>
&nbsp; Number( &#36;count : intValue, &#36;count &gt; 30) from accumulate( &#36;i : (MyEvent ( eventValue == &#36;e.getEventValue() ) over window:time(15s) from entry-point entryone), count( &#36;i )&nbsp;&nbsp; )<br/>
then<br/>
//&nbsp; insert( new Alarm(&#36;e.getEventValue(), &#36;count));<br/>
&nbsp; System.out.println(&quot;rule 1. value=&quot;+&#36;e.getEventValue() +&quot; count=&quot; + &#36;count);<br/>
end</div>

<div>&nbsp;</div>

<div>-&gt; When there are more than 30 &quot;same&quot; events (same value) in the last 15 seconds.</div>

<div>MyEvent and Alarm are simple POJOs. Only fields, getter, setter, ... (value is a String, count is a Integer)</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>The rule like this will fire for EVERY event -&gt; when there are 30 similar events then the log message is printed 30 times.<br/>
I&#39;m unhappy with this situation because I actually only need the RHS triggered only once for each value. Secondly I thought that the performance would be better when firing only once.</div>

<div>So I introduced the Alarm object which is created after the rule triggers the first time for the given value. (the two commented lines)<br/>
When the rule triggers the first time for value X, I create an Alarm object for this value. Then the &quot;not Alarm(...)&quot; should prevent the execution.</div>

<div>This works for the first few thousand events. Then I receive a race condition exception. Occurs usually between event 2000 - 10000.</div>

<div>Exception in thread &quot;main&quot; org.drools.RuntimeDroolsException: Unexpected exception executing action org.drools.reteoo.PropagationQueuingNode&#36;PropagateAction@61f0b700<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.AbstractWorkingMemory.executeQueuedActions(AbstractWorkingMemory.java:1047)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.WindowNode.assertObject(WindowNode.java:182)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.CompositeObjectSinkAdapter.doPropagateAssertObject(CompositeObjectSinkAdapter.java:497)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.CompositeObjectSinkAdapter.propagateAssertObject(CompositeObjectSinkAdapter.java:382)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.ObjectTypeNode.assertObject(ObjectTypeNode.java:302)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.EntryPointNode.assertObject(EntryPointNode.java:254)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:366)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:327)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:130)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:58)<br/>
&nbsp;&nbsp; &nbsp;at ........myClass.main(...)<br/>
&nbsp;&nbsp; &nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)<br/>
&nbsp;&nbsp; &nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)<br/>
&nbsp;&nbsp; &nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)<br/>
&nbsp;&nbsp; &nbsp;at java.lang.reflect.Method.invoke(Method.java:597)<br/>
&nbsp;&nbsp; &nbsp;at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)<br/>
Caused by: java.lang.NullPointerException<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.ReteooWorkingMemory&#36;EvaluateResultConstraints.execute(ReteooWorkingMemory.java:593)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.PropagationContextImpl.evaluateActionQueue(PropagationContextImpl.java:364)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.PropagationQueuingNode&#36;AssertAction.execute(PropagationQueuingNode.java:449)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.PropagationQueuingNode.propagateActions(PropagationQueuingNode.java:289)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.reteoo.PropagationQueuingNode&#36;PropagateAction.execute(PropagationQueuingNode.java:611)<br/>
&nbsp;&nbsp; &nbsp;at org.drools.common.AbstractWorkingMemory.executeQueuedActions(AbstractWorkingMemory.java:1045)<br/>
&nbsp;&nbsp; &nbsp;... 16 more</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>I&#39;m adding a few thousand events into the knowledge session. Then let the thread sleep for a few seconds for drools to catch up. I tried this with various combinations and always received the exception.<br/>
It works when trying for a few hundred events. (rule triggers as planed)</div>

<div>&nbsp;</div>

<div>&nbsp;</div>

<div>for the sake of completeness:<br/>
My Java class that starts the knowledge session and inserts the events:<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; KnowledgeBaseConfiguration config = KnowledgeBaseFactory.newKnowledgeBaseConfiguration();<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; config.setOption(EventProcessingOption.STREAM);<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; kbase = KnowledgeBaseFactory.newKnowledgeBase(config);<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; kbase.addKnowledgePackages(pkgs);</div>

<div>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; KnowledgeSessionConfiguration conf = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ksession = kbase.newStatefulKnowledgeSession(conf, null);<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; entryPoint1 = ksession.getWorkingMemoryEntryPoint(&quot;entryone&quot;);<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; new Thread() {<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; @Override<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; public void run() {<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ksession.fireUntilHalt();<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }.start();</div>

<div><br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;...<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;reader = new BufferedReader(new FileReader(&quot;50k.log&quot;));<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;String line = reader.readLine();<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;long i = 0;<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;while (line != null) {<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;MyEvent entry = parser.parse(line.trim());<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;if ( entry != null ) {<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;entryPoint1.insert(entry);<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;}<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;line = reader.readLine();</div>

<div>&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;if ( ++i % 5000 == 0 ) {<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;System.out.println(&quot;Inserter: &quot; + i + &quot; lines inserted. - &quot; + sdf.format(new Date()));<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;Thread.sleep(5000);<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;}<br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;}</div>

<div><br/>
&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;</div>

<div>My main question:&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;<br/>
Apart from the race condition problem, are there any other ways I could solve my problem?</div>

<div>&nbsp;</div>

<div>E.g.<br/>
* Do you think that I shouldn&#39;t care about &quot;not Alarm(eventValue == &#36;e.getEventValue())&quot; and just live with the result that the program, that uses the knowledge base, has to deal with lots of identical Alarm events? (And has to perform the accumulate more often)<br/>
* Is there an easier way to count the events?<br/>
* Is Drools overburdened with the number of events?<br/>
* Is Drools Fusion/CEP not designed for my problem and other tools are more useful instead? Which ones?</div>

<div>&nbsp;</div>

<div>I would appreciate any ideas.</div>

<div>&nbsp;</div>

<div>Sincerely,<br/>
Martin</div>

<div>&nbsp;</div>

<div>&nbsp;</div>
</div></div></body></html>