[jboss-svn-commits] JBL Code SVN: r6625 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 5 17:28:35 EDT 2006


Author: jokum
Date: 2006-10-05 17:28:32 -0400 (Thu, 05 Oct 2006)
New Revision: 6625

Added:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java
Modified:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
Introduction active and passive listeners
+ JBESB-165

Copied: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java (from rev 6623, labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java)
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-05 20:07:30 UTC (rev 6623)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractActiveListener.java	2006-10-05 21:28:32 UTC (rev 6625)
@@ -0,0 +1,72 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractActiveListener extends AbstractListener {
+        
+	protected AbstractActiveListener(GpListener p_oDad, DomElement p_oParms,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+		super(p_oDad, p_oParms, actionDefinitionFactory);
+    } // __________________________________
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        while (m_oDad.continueLooping()) {
+            Object[] processList = receive();
+            
+			for (Object message : processList) {
+				ActionProcessingPipeline runner = new ActionProcessingPipeline(
+						message);
+				this.pipelineExecutorPool.submit(runner);
+			}
+        }
+	}
+    
+    /**
+     * Receive message from underlying channel implementation.
+     * <p/>
+     * Implementations must perform a blocking receive.
+     * @return An array of Objects received on the channel.
+     */
+    protected abstract Object[] receive();
+    
+	/**
+	 * Close the listener implemenation. <p/> Allows the listener to perform
+	 * relevant close/cleanup tasks.
+	 */
+	protected abstract void close();
+
+} // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-05 20:36:01 UTC (rev 6624)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-05 21:28:32 UTC (rev 6625)
@@ -36,7 +36,7 @@
  * 
  * @author Esteban
  */
-public abstract class AbstractPoller extends AbstractListener {
+public abstract class AbstractPoller extends AbstractActiveListener {
 
     // You can override these values at constructor time of your
     // derived class after calling super(GpListener,DomElement)

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-10-05 20:36:01 UTC (rev 6624)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-10-05 21:28:32 UTC (rev 6625)
@@ -39,7 +39,7 @@
 import org.jboss.soa.esb.helpers.AppServerContext;
 import org.jboss.soa.esb.helpers.DomElement;
 
-public class JmsQueueListener extends AbstractListener {
+public class JmsQueueListener extends AbstractActiveListener {
 
     public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
     public static final String LISTEN_JNDI_TYPE = "listenJndiType";
@@ -52,6 +52,9 @@
     protected QueueSession m_oQsess;
     protected Queue m_oQueue;
     protected String m_sSelector;
+    
+    //TODO refactor to config setting
+    protected long reconnectionInterval = 2000;
 
     protected MessageConsumer jmsMessageReceiver;
 
@@ -120,7 +123,7 @@
                     catch (Exception e) {
                         logger.error("Reconnecting to Queue", e);
                         try {
-                            Thread.sleep(m_iSleepForThreads);
+                            Thread.sleep(this.reconnectionInterval);
                         } catch (InterruptedException e1) { // Just return
                             logger.error("Unexpected thread interupt exception.", e);
                             return null;




More information about the jboss-svn-commits mailing list