The test (as you describe it) isn't really reproducing what will
happen in reality. A burst of 5000 events (almost) at the same time
followed by a lull of 5sec isn't quite like a scenario of 25-50k per
minute.
The Alarm tied to some eventValue is OK. But I'd try and accumulate
the matching events "manually" into a list kept in the Alarm fact, and
I'd even try and do the retraction by comparing the timestamp of the
oldest with the timestamp of a new arrival. You can easily handle the
situation when 30 is reached. (You may have to decide when this is
retriggered, i.e., the count falls below 30 and then exceeds 30 once
more.)
A little consideration should also be given to the number of different
eventValues. If these values are from a stable set, I don't see any
problems.
-W
On 20/01/2014, Martin Kitzler <martin_kitzler(a)gmx.at> wrote:
Hello everyone,
i'm having some design problems with Drools Fusion and CEP. Maybe you can
guide me into the right direction.
(Drools 5.6)
Problem:
I want to use Fusion CEP to handle a quite large amount of events (25-50
thousand per minute as a start). The events don't have to live very long so
I plan to expire them very soon.
My rule is pretty simple:
There is only one event type (which has a string value field in it).
The rule should fire when there are X events with the same value Y in time
window Z.
The effect of the rule should be something like an Alarm which will be
extracted from outside the rule engine.
I found some similar examples (and also studies the documentation) but the
race condition (see down below) destroys all my plans.
ATM it looks like this:
declare MyEvent
@role(event)
@expires( 30s )
end
declare Alarm
@role(event)
end
rule "RULE 1"
when
$e : MyEvent( ) from entry-point entryone
// not Alarm(eventValue == $e.getEventValue())
Number( $count : intValue, $count > 30) from accumulate( $i : (MyEvent (
eventValue == $e.getEventValue() ) over window:time(15s) from entry-point
entryone), count( $i ) )
then
// insert( new Alarm($e.getEventValue(), $count));
System.out.println("rule 1. value="+$e.getEventValue() +" count="
+
$count);
end
-> When there are more than 30 "same" events (same value) in the last 15
seconds.
MyEvent and Alarm are simple POJOs. Only fields, getter, setter, ... (value
is a String, count is a Integer)
The rule like this will fire for EVERY event -> when there are 30 similar
events then the log message is printed 30 times.
I'm unhappy with this situation because I actually only need the RHS
triggered only once for each value. Secondly I thought that the performance
would be better when firing only once.
So I introduced the Alarm object which is created after the rule triggers
the first time for the given value. (the two commented lines)
When the rule triggers the first time for value X, I create an Alarm object
for this value. Then the "not Alarm(...)" should prevent the execution.
This works for the first few thousand events. Then I receive a race
condition exception. Occurs usually between event 2000 - 10000.
Exception in thread "main" org.drools.RuntimeDroolsException: Unexpected
exception executing action
org.drools.reteoo.PropagationQueuingNode$PropagateAction@61f0b700
at
org.drools.common.AbstractWorkingMemory.executeQueuedActions(AbstractWorkingMemory.java:1047)
at org.drools.reteoo.WindowNode.assertObject(WindowNode.java:182)
at
org.drools.reteoo.CompositeObjectSinkAdapter.doPropagateAssertObject(CompositeObjectSinkAdapter.java:497)
at
org.drools.reteoo.CompositeObjectSinkAdapter.propagateAssertObject(CompositeObjectSinkAdapter.java:382)
at
org.drools.reteoo.ObjectTypeNode.assertObject(ObjectTypeNode.java:302)
at
org.drools.reteoo.EntryPointNode.assertObject(EntryPointNode.java:254)
at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:366)
at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:327)
at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:130)
at org.drools.common.NamedEntryPoint.insert(NamedEntryPoint.java:58)
at ........myClass.main(...)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: java.lang.NullPointerException
at
org.drools.reteoo.ReteooWorkingMemory$EvaluateResultConstraints.execute(ReteooWorkingMemory.java:593)
at
org.drools.common.PropagationContextImpl.evaluateActionQueue(PropagationContextImpl.java:364)
at
org.drools.reteoo.PropagationQueuingNode$AssertAction.execute(PropagationQueuingNode.java:449)
at
org.drools.reteoo.PropagationQueuingNode.propagateActions(PropagationQueuingNode.java:289)
at
org.drools.reteoo.PropagationQueuingNode$PropagateAction.execute(PropagationQueuingNode.java:611)
at
org.drools.common.AbstractWorkingMemory.executeQueuedActions(AbstractWorkingMemory.java:1045)
... 16 more
I'm adding a few thousand events into the knowledge session. Then let the
thread sleep for a few seconds for drools to catch up. I tried this with
various combinations and always received the exception.
It works when trying for a few hundred events. (rule triggers as planed)
for the sake of completeness:
My Java class that starts the knowledge session and inserts the events:
KnowledgeBaseConfiguration config =
KnowledgeBaseFactory.newKnowledgeBaseConfiguration();
config.setOption(EventProcessingOption.STREAM);
kbase = KnowledgeBaseFactory.newKnowledgeBase(config);
kbase.addKnowledgePackages(pkgs);
KnowledgeSessionConfiguration conf =
KnowledgeBaseFactory.newKnowledgeSessionConfiguration();
ksession = kbase.newStatefulKnowledgeSession(conf, null);
entryPoint1 = ksession.getWorkingMemoryEntryPoint("entryone");
new Thread() {
@Override
public void run() {
ksession.fireUntilHalt();
}
}.start();
...
reader = new BufferedReader(new FileReader("50k.log"));
String line = reader.readLine();
long i = 0;
while (line != null) {
MyEvent entry = parser.parse(line.trim());
if ( entry != null ) {
entryPoint1.insert(entry);
}
line = reader.readLine();
if ( ++i % 5000 == 0 ) {
System.out.println("Inserter: " + i + " lines inserted. -
"
+ sdf.format(new Date()));
Thread.sleep(5000);
}
}
My main question:
Apart from the race condition problem, are there any other ways I could
solve my problem?
E.g.
* Do you think that I shouldn't care about "not Alarm(eventValue ==
$e.getEventValue())" and just live with the result that the program, that
uses the knowledge base, has to deal with lots of identical Alarm events?
(And has to perform the accumulate more often)
* Is there an easier way to count the events?
* Is Drools overburdened with the number of events?
* Is Drools Fusion/CEP not designed for my problem and other tools are more
useful instead? Which ones?
I would appreciate any ideas.
Sincerely,
Martin