[jboss-svn-commits] JBL Code SVN: r34659 - labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Aug 12 02:33:02 EDT 2010
Author: diegoll
Date: 2010-08-12 02:33:02 -0400 (Thu, 12 Aug 2010)
New Revision: 34659
Added:
labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java
Log:
[JBRULES-2616] added scheduler service
Added: labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java
===================================================================
--- labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java (rev 0)
+++ labs/jbossrules/branches/5_1_20100802_esteban_diega/drools-process/drools-persistent-timer/drools-timer-scheduler/src/main/java/org/drools/timer/scheduler/TimerSchedulerHornetQService.java 2010-08-12 06:33:02 UTC (rev 34659)
@@ -0,0 +1,92 @@
+package org.drools.timer.scheduler;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.drools.timer.message.TimerMessage;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TimerSchedulerHornetQService extends TimerTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(TimerSchedulerHornetQService.class);
+
+ private ClientConsumer consumer;
+ private TimerScheduler timerScheduler;
+ private AtomicBoolean running = new AtomicBoolean(false);
+
+ public void run() {
+ running.set(true);
+ try {
+ while (running.get()) {
+ ClientMessage clientMessage = getConsumer().receive();
+ if (clientMessage != null) {
+ TimerMessage message = (TimerMessage) readMessage(clientMessage);
+ timerScheduler.process(message);
+ }
+ }
+ } catch (HornetQException e) {
+ switch (e.getCode()) {
+ case HornetQException.OBJECT_CLOSED:
+ logger.info(e.getMessage(), e);
+ break;
+ default:
+ logger.error(e.getMessage(), e);
+ break;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Server Exception with class " + getClass());
+ }finally{
+ running.set(false);
+ }
+ }
+
+ private Object readMessage(ClientMessage msgReceived) throws IOException {
+ int bodySize = msgReceived.getBodySize();
+ byte[] message = new byte[bodySize];
+ msgReceived.getBodyBuffer().readBytes(message);
+ ByteArrayInputStream bais = new ByteArrayInputStream(message);
+ try {
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ return ois.readObject();
+ } catch (IOException e) {
+ throw new IOException("Error reading message");
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Error creating message");
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ boolean cancelResult = super.cancel();
+ running.set(false);
+ return cancelResult;
+ }
+
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ public void setConsumer(ClientConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public ClientConsumer getConsumer() {
+ return consumer;
+ }
+
+ public void setTimerScheduler(TimerScheduler timerScheduler) {
+ this.timerScheduler = timerScheduler;
+ }
+
+ public TimerScheduler getTimerScheduler() {
+ return timerScheduler;
+ }
+
+}
More information about the jboss-svn-commits
mailing list