[jboss-svn-commits] JBL Code SVN: r34659 - labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Aug 12 02:33:02 EDT 2010


Author: diegoll
Date: 2010-08-12 02:33:02 -0400 (Thu, 12 Aug 2010)
New Revision: 34659

Added:
   labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java
Log:
[JBRULES-2616] added scheduler service

Added: labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java
===================================================================
--- labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java	                        (rev 0)
+++ labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java	2010-08-12 06:33:02 UTC (rev 34659)
@@ -0,0 +1,92 @@
+package org.drools.timer.scheduler;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.drools.timer.message.TimerMessage;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimerSchedulerHornetQService extends TimerTask {
+
+	private static final Logger logger = LoggerFactory.getLogger(TimerSchedulerHornetQService.class);
+
+	private ClientConsumer consumer;
+	private TimerScheduler timerScheduler;
+	private AtomicBoolean running = new AtomicBoolean(false);
+
+    public void run() {
+        running.set(true);
+        try {
+            while (running.get()) {
+                ClientMessage clientMessage = getConsumer().receive();
+                if (clientMessage != null) {
+                    TimerMessage message = (TimerMessage) readMessage(clientMessage);
+                    timerScheduler.process(message);
+                }
+            }
+        } catch (HornetQException e) {
+            switch (e.getCode()) {
+            case HornetQException.OBJECT_CLOSED:
+                logger.info(e.getMessage(), e);
+                break;
+            default:
+                logger.error(e.getMessage(), e);
+                break;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Server Exception with class " + getClass());
+        }finally{
+            running.set(false);
+        }
+    }
+	
+	private Object readMessage(ClientMessage msgReceived) throws IOException {
+		int bodySize = msgReceived.getBodySize();
+		byte[] message = new byte[bodySize];
+		msgReceived.getBodyBuffer().readBytes(message);
+		ByteArrayInputStream bais = new ByteArrayInputStream(message);
+		try {
+			ObjectInputStream ois = new ObjectInputStream(bais);
+			return ois.readObject();
+		} catch (IOException e) {
+			throw new IOException("Error reading message");
+		} catch (ClassNotFoundException e) {
+			throw new IOException("Error creating message");
+		}
+	}
+
+    @Override
+    public boolean cancel() {
+        boolean cancelResult = super.cancel();
+        running.set(false);
+        return cancelResult;
+    }
+
+    public boolean isRunning() {
+        return running.get();
+    }
+	
+	public void setConsumer(ClientConsumer consumer) {
+		this.consumer = consumer;
+	}
+
+	public ClientConsumer getConsumer() {
+		return consumer;
+	}
+
+	public void setTimerScheduler(TimerScheduler timerScheduler) {
+		this.timerScheduler = timerScheduler;
+	}
+
+	public TimerScheduler getTimerScheduler() {
+		return timerScheduler;
+	}
+
+}



More information about the jboss-svn-commits mailing list