//package tests; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.util.Calendar; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import org.drools.runtime.StatefulKnowledgeSession; public class EventStream { private float probabilityValue; private int symbol; private float price; private int volume; private float currValue; private StatefulKnowledgeSession ksession; public long counter = 1; public long old_counter = 1; private Random randGen1 = new Random(); // Default seed comes from system time. private Random randGen2 = new Random(); private Random randGen3 = new Random(); private Random randGen4 = new Random(); public double global_duration = 1; public EventStream(){} public void register_session(StatefulKnowledgeSession _sess){ksession = _sess;} public StatefulKnowledgeSession get_working_session() { return ksession;} /** * Generates synthetic event for a given duration time and probability for increasing stock price. * @param duration - Duration time in seconds (after which the method stops to generate events). If time==0, the method creates infinite event stream; * @param p - Probability for increasing stock price; * @throws FileNotFoundException */ void generateEventStream(double duration, double p) { global_duration = duration; Timer counterTimer = new Timer(); CounterRunner runner = new CounterRunner(); counterTimer.schedule(runner,0,1000); Timer timer = new Timer(); Stopper stopper = new Stopper(); timer.schedule(stopper,(long)duration * 1000); try { if(p < 0 || p > 1) throw new Exception(); } catch (Exception e) { System.err.println("Invalid parameter for probability p."); e.printStackTrace(); } try{ /*Increase, decrease or the same value of stocks are created by the following rule, for 0 =< p =< 1: if probabilityValue < p increase the stock value; if p < probabilityValue < (p+1)/2 decrease the stock value; if (p+1)/2 < probabilityValue < 1 the stock value remains the same;*/ double decr = (p+1)/2; //Query query=new Query("retractall(logging(on))"); //query.hasSolution(); while(stopper.done() == 0){ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("ExpA.P",true))); symbol = randGen1.nextInt(2) + 1; // values: 1,2 price = randGen2.nextInt(1000) + 1; // values: 1,2,...,1000 volume = randGen3.nextInt(1000) + 1; // values: 1,2,...,1000 probabilityValue = randGen4.nextFloat(); // values between 0 and 1 if(probabilityValue < p){ currValue = increase(price); }else if(probabilityValue > p && probabilityValue < decr){ currValue = decrease(price); }else if(probabilityValue > decr){ currValue = price; } Calendar ca = Calendar.getInstance(); //String datime = "datime(" + ca.get(Calendar.YEAR) + "," + ( ca.get(Calendar.MONTH) + 1 ) + "," + ca.get(Calendar.DATE) + "," + ca.get(Calendar.HOUR_OF_DAY) + "," + ca.get(Calendar.MINUTE) + "," + ca.get(Calendar.SECOND) + "," + counter + ")"; String datime = new String(""+counter); //bw.append("event(stock(" + symbol + "," + currValue + "," + volume + "),[" + datime + "," + datime + "])"+".\n"); bw.append("event(stock(" + symbol + "," + currValue + "),[" + datime + "," + datime + "])"+".\n"); //Drools Event temp = new Event(symbol,currValue); ksession.insert(temp); // //bw.append("event(systemClock(" + datime + "),[" + datime + "," + datime + "])"+".\n"); //query=new Query("event(stock(" + symbol + "," + currValue + "," + volume + "),[" + datime + "," + datime + "])"); //query.hasSolution(); counter++; bw.close(); } counterTimer.cancel(); } catch (Exception e) { System.err.println("Exception: "); e.printStackTrace(); } } private float increase(float currValue){ return (float)(currValue + 0.1); } private float decrease(float currValue){ return (float)(currValue - 0.1); } class CounterRunner extends TimerTask { public void run(){ //Print the number of events generated per second, typically > 200 000, System.out.println("\nNo. of events/second is: " + (counter-old_counter) + ".\n"); old_counter = counter; } } class Stopper extends TimerTask { int finished = 0; public int done(){return this.finished;} public void run(){ //Print the number of events generated in total System.out.flush(); System.out.println("\nNo. of events in total is: " + counter + "."); System.out.println("No. of total events/second is: " + (counter/global_duration)); System.out.flush(); // retrieve the results from Prolog //System.out.println("\n Retrieve the results from Prolog: "); //Query queryRule=new Query("findall(eventFired(E),eventFired(E),L)"); //Compound result = (Compound)(queryRule.oneSolution().get("L")); //System.out.println(result); System.out.flush(); this.finished = 1; } } }