First of all, thanks for the support.
Mario, I confirm that the case you provided works fine also at my side. Still, have you tried my first example (using the cron directly to the counting rule)? Can you confirm it is not working, since this was my first concern?
As I said in the previous post, using the CronTrigger pattern greatly improves stability (I managed to run it at 500 eps), but it is not resolutive.
Adding a few rules, it's enough to go back to instability.
Here's the complete test case.
Basically, a SynthEvent is created with the meta list populated randomly with up to 4 values picked from a list of 10 strings ("one" to "ten").
For each meta value an Entity object is created (for enumeration purposes).
Then, rules generate metrics for both events (every 10s) and meta (every 60s).
Test is ran at 300 events per second +/- 20% (our real use case is closer to some hundreds events per second than some tens).
First of all, the full SynthEvent class:
package it.intext.unity.test;
public class SynthEvent {
public void setId(long id) {
public Date getTimestamp() {
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
public List<String> getMeta() {
public void setMeta(List<String> meta) {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SynthEvent [id=");
builder.append(id);
builder.append(", timestamp=");
builder.append(timestamp);
builder.append(", meta=");
builder.append(meta);
builder.append("]");
return builder.toString();
}
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("SynthEvent [id=");
builder.append(id);
builder.append(", timestamp=");
builder.append(timestamp);
builder.append(", meta=");
builder.append(meta);
builder.append("]");
return builder.toString();
}
Here's the feeding part of the test class:
private void process(final KieSession session) {
new Thread(){
public void run() {
session.fireUntilHalt();
};
}.start();
}
private void feed(final KieSession session) {
new Thread(){
public void run() {
try {
int counter = 0;
while(true) {
counter++;
session.getEntryPoint("synth").insert(createEvent());
Thread.sleep(getSleepRate());
if ((counter % 1000) == 0) {
logger.debug("Total events: {}", counter);
}
}
} catch (InterruptedException e) {
logger.warn("{}", e);
}
};
}.start();
}
static final int eventThroughput = 300;
static final double variance = 0.2;
protected long getSleepRate() {
return (long) (1000.0 / (eventThroughput * ((1.0 - variance) + Math.random() * variance * 2)));
}
protected SynthEvent createEvent() {
SynthEvent ret = new SynthEvent();
ret.setId(System.currentTimeMillis());
ret.setTimestamp(new Date());
ret.setMeta(createMeta());
return ret;
}
static final List<String> metas = Arrays.asList("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");
static final int maxMeta = 4;
static final int metaSize = metas.size();
protected List<String> createMeta() {
List<String> ret = new ArrayList<String>();
int nofItems = (int) (Math.random() * maxMeta);
for (int i=0; i<nofItems; i++) {
ret.add(metas.get((int) (Math.random() * metaSize)));
}
return ret;
}
And here's the drl, using the CronTrigger pattern:
package it.intext.unity.test
import it.intext.unity.test.SynthEvent;
import java.util.Date;
global org.slf4j.Logger logger;
declare SynthEvent
@role( event )
@timestamp( timestamp )
end
declare EventCounter
@role( event )
@timestamp( timestamp )
id : long
key : String
timestamp : Date
end
declare CronTrigger
@role( event )
@timestamp( timestamp )
interval : String
timestamp : Date
end
declare MetricRequest
metric : String
end
declare Subscription
key : String
interval : String
end
declare Entity
key : String
end
// Setup rules
rule "Create subscriptions"
when
then
insert( new Subscription("epm", "10s") );
end
rule "Create meta subscriptions 60s"
when
Entity( $key : key )
then
insert( new Subscription($key, "60s") );
end
// Cron management
rule "Cron trigger 10s"
timer ( cron: 0/10 * * * * ? )
when
then
entryPoints["triggers"].insert( new CronTrigger( "10s", new Date() ) );
end
rule "Cron trigger 60s"
timer ( cron: 0/60 * * * * ? )
when
then
entryPoints["triggers"].insert( new CronTrigger( "60s", new Date() ) );
end
rule "Subscription"
when
CronTrigger( $interval := interval ) from entry-point "triggers"
Subscription( $interval := interval, $key : key )
then
entryPoints["requests"].insert( new MetricRequest($key) );
end
// Business rules
rule "Create counter"
when
$e : SynthEvent() from entry-point "synth"
then
entryPoints["counters"].insert( new EventCounter( $e.getId(), "event", $e.getTimestamp() ) );
end
rule "Create meta counter"
when
$e : SynthEvent() from entry-point "synth"
$meta : String( ) from $e.getMeta()
then
entryPoints["counters"].insert( new EventCounter( $e.getId(), $meta, $e.getTimestamp() ) );
end
rule "Create meta"
when
$e : SynthEvent() from entry-point "synth"
$meta : String( ) from $e.getMeta()
not( Entity( key == $meta ) )
then
insert( new Entity($meta) );
end
rule "Count epm"
when
$req : MetricRequest( metric == "epm" ) from entry-point "requests"
Number( $count : intValue ) from accumulate(
EventCounter( key == "event" ) over window:time( 60s ) from entry-point "counters", count(1) )
then
logger.debug("epm = {}", $count );
retract( $req );
end
rule "Count meta"
when
$req : MetricRequest( $key := metric ) from entry-point "requests"
Number( $count : intValue ) from accumulate(
EventCounter( $key := key ) over window:time( 60s ) from entry-point "counters", count(1) )
then
logger.debug("{} = {}", $key, $count );
retract( $req );
end
Here's the log of a quick-failing run:
[DEBUG] 2014-03-25 16:28:04.769 (StreamTester.java:run:68) Total events: 1000
[DEBUG] 2014-03-25 16:28:07.774 (StreamTester.java:run:68) Total events: 2000
[DEBUG] 2014-03-25 16:28:10.100 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 2769
[DEBUG] 2014-03-25 16:28:10.723 (StreamTester.java:run:68) Total events: 3000
[DEBUG] 2014-03-25 16:28:13.681 (StreamTester.java:run:68) Total events: 4000
[DEBUG] 2014-03-25 16:28:16.668 (StreamTester.java:run:68) Total events: 5000
[DEBUG] 2014-03-25 16:28:19.541 (StreamTester.java:run:68) Total events: 6000
[DEBUG] 2014-03-25 16:28:20.054 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 6164
[DEBUG] 2014-03-25 16:28:22.421 (StreamTester.java:run:68) Total events: 7000
[DEBUG] 2014-03-25 16:28:25.336 (StreamTester.java:run:68) Total events: 8000
[DEBUG] 2014-03-25 16:28:28.217 (StreamTester.java:run:68) Total events: 9000
[DEBUG] 2014-03-25 16:28:30.030 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 9627
[DEBUG] 2014-03-25 16:28:31.088 (StreamTester.java:run:68) Total events: 10000
[DEBUG] 2014-03-25 16:28:33.968 (StreamTester.java:run:68) Total events: 11000
[DEBUG] 2014-03-25 16:28:36.904 (StreamTester.java:run:68) Total events: 12000
[DEBUG] 2014-03-25 16:28:39.782 (StreamTester.java:run:68) Total events: 13000
[DEBUG] 2014-03-25 16:28:40.024 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 13077
[DEBUG] 2014-03-25 16:28:42.736 (StreamTester.java:run:68) Total events: 14000
[DEBUG] 2014-03-25 16:28:45.615 (StreamTester.java:run:68) Total events: 15000
[DEBUG] 2014-03-25 16:28:48.459 (StreamTester.java:run:68) Total events: 16000
[DEBUG] 2014-03-25 16:28:50.030 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 16538
[DEBUG] 2014-03-25 16:28:51.412 (StreamTester.java:run:68) Total events: 17000
[DEBUG] 2014-03-25 16:28:54.293 (StreamTester.java:run:68) Total events: 18000
[DEBUG] 2014-03-25 16:28:57.549 (StreamTester.java:run:68) Total events: 19000
[DEBUG] 2014-03-25 16:29:00.021 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 19860
[DEBUG] 2014-03-25 16:29:00.211 (Rule_Count_meta1429890510.java:defaultConsequence:14) two = 2972
[DEBUG] 2014-03-25 16:29:00.214 (Rule_Count_meta1429890510.java:defaultConsequence:14) eight = 3099
[DEBUG] 2014-03-25 16:29:00.215 (Rule_Count_meta1429890510.java:defaultConsequence:14) ten = 3008
[DEBUG] 2014-03-25 16:29:00.215 (Rule_Count_meta1429890510.java:defaultConsequence:14) three = 2982
[DEBUG] 2014-03-25 16:29:00.216 (Rule_Count_meta1429890510.java:defaultConsequence:14) six = 3006
[DEBUG] 2014-03-25 16:29:00.216 (Rule_Count_meta1429890510.java:defaultConsequence:14) five = 2959
[DEBUG] 2014-03-25 16:29:00.217 (Rule_Count_meta1429890510.java:defaultConsequence:14) one = 2934
[DEBUG] 2014-03-25 16:29:00.217 (Rule_Count_meta1429890510.java:defaultConsequence:14) seven = 3013
[DEBUG] 2014-03-25 16:29:00.218 (Rule_Count_meta1429890510.java:defaultConsequence:14) four = 2973
[DEBUG] 2014-03-25 16:29:00.218 (Rule_Count_meta1429890510.java:defaultConsequence:14) nine = 3012
[DEBUG] 2014-03-25 16:29:00.510 (StreamTester.java:run:68) Total events: 20000
[DEBUG] 2014-03-25 16:29:03.671 (StreamTester.java:run:68) Total events: 21000
[DEBUG] 2014-03-25 16:29:06.798 (StreamTester.java:run:68) Total events: 22000
[DEBUG] 2014-03-25 16:29:09.969 (StreamTester.java:run:68) Total events: 23000
[DEBUG] 2014-03-25 16:29:10.045 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 20242
[DEBUG] 2014-03-25 16:29:13.362 (StreamTester.java:run:68) Total events: 24000
[DEBUG] 2014-03-25 16:29:16.352 (StreamTester.java:run:68) Total events: 25000
[DEBUG] 2014-03-25 16:29:19.400 (StreamTester.java:run:68) Total events: 26000
[DEBUG] 2014-03-25 16:29:20.849 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 20029
[DEBUG] 2014-03-25 16:29:22.795 (StreamTester.java:run:68) Total events: 27000
[DEBUG] 2014-03-25 16:29:25.785 (StreamTester.java:run:68) Total events: 28000
[DEBUG] 2014-03-25 16:29:28.855 (StreamTester.java:run:68) Total events: 29000
[DEBUG] 2014-03-25 16:29:30.018 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 18647
[DEBUG] 2014-03-25 16:29:31.858 (StreamTester.java:run:68) Total events: 30000
[DEBUG] 2014-03-25 16:29:34.983 (StreamTester.java:run:68) Total events: 31000
[DEBUG] 2014-03-25 16:29:37.947 (StreamTester.java:run:68) Total events: 32000
[DEBUG] 2014-03-25 16:29:40.011 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 15201
[DEBUG] 2014-03-25 16:29:40.928 (StreamTester.java:run:68) Total events: 33000
[DEBUG] 2014-03-25 16:29:43.920 (StreamTester.java:run:68) Total events: 34000
[DEBUG] 2014-03-25 16:29:46.911 (StreamTester.java:run:68) Total events: 35000
[DEBUG] 2014-03-25 16:29:50.011 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 11740
[DEBUG] 2014-03-25 16:29:50.023 (StreamTester.java:run:68) Total events: 36000
[DEBUG] 2014-03-25 16:29:52.998 (StreamTester.java:run:68) Total events: 37000
[DEBUG] 2014-03-25 16:29:55.925 (StreamTester.java:run:68) Total events: 38000
[DEBUG] 2014-03-25 16:29:58.845 (StreamTester.java:run:68) Total events: 39000
[DEBUG] 2014-03-25 16:30:00.009 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 8417
[DEBUG] 2014-03-25 16:30:00.038 (Rule_Count_meta1429890510.java:defaultConsequence:14) two = 2606
[DEBUG] 2014-03-25 16:30:00.039 (Rule_Count_meta1429890510.java:defaultConsequence:14) eight = 2534
[DEBUG] 2014-03-25 16:30:00.039 (Rule_Count_meta1429890510.java:defaultConsequence:14) ten = 2763
[DEBUG] 2014-03-25 16:30:00.040 (Rule_Count_meta1429890510.java:defaultConsequence:14) three = 2623
[DEBUG] 2014-03-25 16:30:00.040 (Rule_Count_meta1429890510.java:defaultConsequence:14) six = 2641
[DEBUG] 2014-03-25 16:30:00.040 (Rule_Count_meta1429890510.java:defaultConsequence:14) five = 2579
[DEBUG] 2014-03-25 16:30:00.040 (Rule_Count_meta1429890510.java:defaultConsequence:14) one = 2690
[DEBUG] 2014-03-25 16:30:00.041 (Rule_Count_meta1429890510.java:defaultConsequence:14) seven = 2698
[DEBUG] 2014-03-25 16:30:00.041 (Rule_Count_meta1429890510.java:defaultConsequence:14) four = 2658
[DEBUG] 2014-03-25 16:30:00.041 (Rule_Count_meta1429890510.java:defaultConsequence:14) nine = 2669
[DEBUG] 2014-03-25 16:30:01.735 (StreamTester.java:run:68) Total events: 40000
[DEBUG] 2014-03-25 16:30:04.682 (StreamTester.java:run:68) Total events: 41000
[DEBUG] 2014-03-25 16:30:07.604 (StreamTester.java:run:68) Total events: 42000
[DEBUG] 2014-03-25 16:30:10.011 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 5266
[DEBUG] 2014-03-25 16:30:10.622 (StreamTester.java:run:68) Total events: 43000
[DEBUG] 2014-03-25 16:30:13.493 (StreamTester.java:run:68) Total events: 44000
[DEBUG] 2014-03-25 16:30:16.383 (StreamTester.java:run:68) Total events: 45000
[DEBUG] 2014-03-25 16:30:19.263 (StreamTester.java:run:68) Total events: 46000
[DEBUG] 2014-03-25 16:30:20.012 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 2078
[DEBUG] 2014-03-25 16:30:22.146 (StreamTester.java:run:68) Total events: 47000
[DEBUG] 2014-03-25 16:30:25.036 (StreamTester.java:run:68) Total events: 48000
[DEBUG] 2014-03-25 16:30:27.911 (StreamTester.java:run:68) Total events: 49000
[DEBUG] 2014-03-25 16:30:30.003 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 0
[DEBUG] 2014-03-25 16:30:30.800 (StreamTester.java:run:68) Total events: 50000
At 250 eps it took about 56K events (3'30") to hang (short log below)
[DEBUG] 2014-03-25 16:37:20.013 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 16084
[DEBUG] 2014-03-25 16:37:23.222 (StreamTester.java:run:68) Total events: 52000
[DEBUG] 2014-03-25 16:37:26.751 (StreamTester.java:run:68) Total events: 53000
[DEBUG] 2014-03-25 16:37:30.069 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 16085
[DEBUG] 2014-03-25 16:37:30.510 (StreamTester.java:run:68) Total events: 54000
[DEBUG] 2014-03-25 16:37:34.099 (StreamTester.java:run:68) Total events: 55000
[DEBUG] 2014-03-25 16:37:37.701 (StreamTester.java:run:68) Total events: 56000
[DEBUG] 2014-03-25 16:37:40.014 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 15381
[DEBUG] 2014-03-25 16:37:41.381 (StreamTester.java:run:68) Total events: 57000
[DEBUG] 2014-03-25 16:37:44.977 (StreamTester.java:run:68) Total events: 58000
[DEBUG] 2014-03-25 16:37:48.632 (StreamTester.java:run:68) Total events: 59000
[DEBUG] 2014-03-25 16:37:50.011 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 12677
[DEBUG] 2014-03-25 16:37:52.343 (StreamTester.java:run:68) Total events: 60000
[DEBUG] 2014-03-25 16:37:55.958 (StreamTester.java:run:68) Total events: 61000
[DEBUG] 2014-03-25 16:37:59.546 (StreamTester.java:run:68) Total events: 62000
[DEBUG] 2014-03-25 16:38:00.009 (Rule_Count_epm701941061.java:defaultConsequence:14) epm = 10033
At 200 eps the system hanged after 185K events (more than 16 minutes).
More realish examples (e.g. counting how many distinct tweets get at least one retweet [far less than 10%, BTW]) hang in less than 10 minutes even at 20 events per second or less.
Hope this helps. Please guys, ask me for more details at any moment.
I would be definitely glad to solve this issue and to go on with our Drools Fusion roadmap ;)
Thanks in advance,
Vieri