Hi all,
A few weeks ago I posted a question about my use case to the mailing list. The
correspondence is attached.
This is the example of the use case I implemented:
Port scan event - the basic event is connection log. For each combination
of source_ip and destination_ip, detect a port scan event,
if over 5 seconds there were more than 2 connection logs with
different ports .
The event will stay open for 10 seconds and an update will be
sent for any new port detected. Every update will contain the count of
connection logs combining it and their id ("marker").
the drl fie:
package test;
import correlation.impl.drools.Log
import correlation.impl.drools.CorrelatedEvent
global correlation.server.EventsHandler externalEventsHandler;
declare Log
@role( event)
end
declare CorrelatedEvent
@role( event)
@timestamp( getTimestamp().getTime() )
@expires( 10s )
@duration( getDuration() )
end
// this rule will create a "Port Scan" event if none exist for this group-by
values
rule "Create Port Scan Event"
dialect "java"
no-loop
when
$log : Log() from entry-point "Log stream" //get all the logs in the last 5
seconds
accumulate( Log( this after[0s,5s] $log, fieldsMap.get("src") ==
$log.fieldsMap.get("src") , fieldsMap.get("dst") ==
$log.fieldsMap.get("dst"), $port : fieldsMap.get("port")) from
entry-point "Log stream";
$portSet : collectSet($port);
$portSet.size > 2 )
accumulate( Log( this after[0s,5s] $log, fieldsMap.get("src") ==
$log.fieldsMap.get("src") , fieldsMap.get("dst") ==
$log.fieldsMap.get("dst"), $marker : fieldsMap.get("marker")) from
entry-point "Log stream";
$markerSet : collectSet($marker))
not CorrelatedEvent(getName() == "portScan" , fieldsMap.get("src")
== $log.fieldsMap.get("src") , fieldsMap.get("dst") ==
$log.fieldsMap.get("dst"))
then
System.out.println(drools.getRule().getName());
CorrelatedEvent $ce = new CorrelatedEvent();
$ce.setName("portScan");
$ce.setEventsHandler(externalEventsHandler);
$ce.setDurationInSec(10);
$ce.fieldsMap.put("src", $log.fieldsMap.get("src"));
$ce.fieldsMap.put("dst", $log.fieldsMap.get("dst"));
$ce.endUpdate($markerSet);
insert($ce);
end
rule "Create Port Scan Event - update"
dialect "java"
no-loop
when
$ce: CorrelatedEvent(getName() == "portScan")
accumulate( Log(fieldsMap.get("src") == $ce.fieldsMap.get("src") ,
fieldsMap.get("dst") == $ce.fieldsMap.get("dst") , $port :
fieldsMap.get("port") , (this meets $ce || this during $ce || this metby $ce))
from entry-point "Log stream";
$portSet : collectSet($port);
$portSet.size > 0 )
accumulate( Log(fieldsMap.get("src") == $ce.fieldsMap.get("src") ,
fieldsMap.get("dst") == $ce.fieldsMap.get("dst") , $marker :
fieldsMap.get("marker") , (this meets $ce || this during $ce || this metby $ce))
from entry-point "Log stream";
$markerSet : collectSet($marker))
then
System.out.println(drools.getRule().getName());
modify( $ce ) {endUpdate($markerSet)}
end
my questions:
1) If I have only one stream of data , can I omit the use of entry point and insert
logs to the session ? Or the use of entry points is mandatory in Drools Fusion?
2) When I tested it with matching data, rule "Create Port Scan Event -
update" was never fired. When I replaced "(this meets $ce || this during $ce ||
this metby $ce)" with "this after $ce.getStartTime() , this before
$ce.getEndTime()" everything worked fine.
Why?
3) I tried to use sliding windows in rule "Create Port Scan Event" and an
exception was thrown at runtime. I decided to use "this after[0s,5s] $log"
instead. Is it correct?
4) Is my basic Implementation correct?
Thank you all very much.
Log class:
public class Log {
public HashMap<String, Object> fieldsMap = new HashMap<>();
}
CorrelatedEvent class:
public class CorrelatedEvent
{
public Map<String, Object> fieldsMap;
private String name;
private Set<String> markersSet;
private long logsCount;
private Calendar startTime;
private Calendar endTime;
private int duration;
private EventsHandler eventsHandler;
public CorrelatedEvent()
{
startTime = Calendar.getInstance();
endTime = Calendar.getInstance();
endTime.setTime(startTime.getTime());
fieldsMap = new HashMap<>();
markersSet = new HashSet<>();
logsCount = 0;
}
public Date getTimestamp()
{
return startTime.getTime();
}
public Date getStartTime()
{
return startTime.getTime();
}
public Date getEndTime()
{
return endTime.getTime();
}
public void setDurationInSec(int duration)
{
this.duration = duration;
endTime.setTime(startTime.getTime());
endTime.add(Calendar.SECOND, duration);
}
public int getDuration()
{
return duration;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
fieldsMap.put("name", name);
}
public void setEventsHandler(EventsHandler eventsHandler)
{
this.eventsHandler = eventsHandler;
}
public void endUpdate(Set<String> markersSet)
{
this.markersSet.addAll(markersSet);
if (this.markersSet.size() > logsCount) {
logsCount = this.markersSet.size();
if (eventsHandler == null)
return;
Map<String, Object> clonedFieldsMap = getClonedFieldsMap();
clonedFieldsMap.put("markers", this.markersSet.toString()); //need
a function that converts a set of markers to a "\n" separated list
clonedFieldsMap.put("count", logsCount);
eventsHandler.handleEvent(clonedFieldsMap);
}
}
private Map<String, Object> getClonedFieldsMap()
{
Map<String, Object> clonedFieldsMap = new HashMap<>();
clonedFieldsMap.putAll(fieldsMap);
return clonedFieldsMap;
}
}