[jboss-svn-commits] JBL Code SVN: r6681 - labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sat Oct 7 17:15:41 EDT 2006


Author: jokum
Date: 2006-10-07 17:15:39 -0400 (Sat, 07 Oct 2006)
New Revision: 6681

Modified:
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
Log:
Implementation HttpListener

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java	2006-10-07 21:15:39 UTC (rev 6681)
@@ -23,6 +23,9 @@
 package org.jboss.soa.esb.listeners;
 
 import java.util.Arrays;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
@@ -35,199 +38,194 @@
 import org.jboss.soa.esb.message.format.MessageFactory;
 
 /**
- * Base abstract listener implementation.
+ * Base abstract message listener implementation. This is a 'passive' message
+ * listener to be used when the underlying channel implementation implements the
+ * blocking receive itself. For example if the concrete listener class is
+ * implemented using the JBoss Remoting channel implementations (For example
+ * HttpListener).
+ * 
  * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @author <a href="mailto:johan.kumps at telenet.be">Johan Kumps</a>
  * @since Version 4.0
  */
-public abstract class AbstractListener implements Runnable {   
-    
-    // You can override these values at constructor time of your
-    // derived class after calling super(GpListener,DomElement)
-    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
-    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+public abstract class AbstractListener implements Runnable {
 
-    protected int 			m_iQthr = 0, m_iMaxThr;
+	// You can override these values at constructor time of your
+	// derived class after calling super(GpListener,DomElement)
 
-    protected ThreadGroup 	m_oThrGrp = null;
-    protected Logger 		logger;
-    protected GpListener 	m_oDad;
-    protected DomElement 	listenerConfig;
-    protected String[] 		m_oActions;
-    protected ActionDefinitionFactory m_oActionDefinitionFactory;
-    protected MessageFactory m_oMsgFactory;
+	protected int m_iUpperThreadLimit = 10; // just in case - override if you
 
-    protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-        
-        logger 		= Logger.getLogger(this.getClass());
-        m_oDad 		= p_oDad;
-        listenerConfig = p_oParms.cloneObj();
-        m_oActionDefinitionFactory = actionDefinitionFactory;
-        m_oMsgFactory = MessageFactory.getInstance();
-        m_oThrGrp 	= new ThreadGroup(listenerConfig.getName());
+	// wish
 
-        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
-        m_oActions 	= sAtt.split(",");
-        
-        if(m_oActions.length == 0) {
-            throw new ConfigurationException("Listener 'actions' list must be specified.");
-        }
+	protected Logger logger;
 
-        sAtt		= GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-        int iMax 	= Integer.parseInt(sAtt);
-        m_iMaxThr 	= Math.min(iMax, m_iUpperThreadLimit);
-    } // __________________________________
+	protected GpListener m_oDad;
 
-    /**
-     * Implement run method for this Runnable <p/> Will continue to run until
-     * controlling class (ref in m_oDad) indicates no more looping allowed for
-     * all child classes <p/> This condition will not prevent child processes to
-     * finish normally
-     */
-    public void run() {
-        while (m_oDad.continueLooping()) {
-            Object[] processList = receive();
-            if (null==processList)
-            	try { Thread.sleep(500); }
-            	catch(InterruptedException e) {/*  ok  do nothing  */}
-            else
-              for (Object currentObj : processList) {
-                if (m_iQthr >= m_iMaxThr) {
-                    logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
-                    try {
-                        Thread.sleep(m_iSleepForThreads);
-                    } catch (InterruptedException e) {
-                        return;
-                    }
-                    break;
-                }
+	protected DomElement listenerConfig;
 
-                // Spawn a thread and push the message message through the pipeline...
-                ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
-                new Thread(runner).start();
-            }
-        }
-        
-        close();
-    }
-    
-    /**
-     * Receive message from underlying channel implementation.
-     * <p/>
-     * Implementations must perform a blocking receive.
-     * @return An array of Objects received on the channel.
-     */
-    protected abstract Object[] receive();
+	protected String[] m_oActions;
 
-    /**
-     * Called on the listener implementation when pipeline processing error has occured.
-     * @param initialMsg The message that was initialy supplied to the pipeline.
-     * @param processor The processor raised the error.
-     * @param error The error.
-     */
-    protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
+	protected ActionDefinitionFactory m_oActionDefinitionFactory;
 
-    /**
-     * Called on the listener implementation when pipeline processing of a message is complete.
-     * @param initialMsg The message that was initialy supplied to the pipeline.
-     */
-    protected abstract void processingComplete(Object initialMsg);
-    
-    /**
-     * Close the listener implemenation.
-     * <p/>
-     * Allows the listener to perform relevant close/cleanup tasks.
-     */
-    protected abstract void close();
-    
-    /**
-     * Increment the active thread count.
-     */
-    private void incThreads() {
-        m_iQthr++;
-    }
+	protected MessageFactory m_oMsgFactory;
 
-    /**
-     * Decrement the active thread count.
-     */
-    private void decThreads() {
-        m_iQthr--;
-    }
-    
-    /**
-     * Action Processing Pipeline.
-     * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload message received
-     * by the listener implementation.
-     * 
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     * @since Version 4.0
-     */
-    private class ActionProcessingPipeline implements Runnable {
-        
-        private Object initialObject;
-             
-        /**
-         * Private constructor.
-         * @param pMessage The inital processing target message.
-         */
-        private ActionProcessingPipeline(Object obj) {
-            initialObject = obj;
-        }
+	protected ExecutorService pipelineExecutorPool = null;
 
-        /* (non-Javadoc)
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            String currentAction = null;
-            ActionProcessor currentProcessor = null;
-            
-            // Increment the active thread count for the listener on starting...
-            incThreads();
-            
-            try {
-                Message message = m_oMsgFactory.getMessage();
-                ActionUtils.putCurrentObject(message,initialObject);
+	protected AbstractListener(GpListener p_oDad, DomElement p_oParms,
+			ActionDefinitionFactory actionDefinitionFactory) throws Exception {
 
-                // Run the message through each ActionProcessor...
-                for(String action : m_oActions) {
-                    ActionDefinition actionDefinition;
+		logger = Logger.getLogger(this.getClass());
+		m_oDad = p_oDad;
+		listenerConfig = p_oParms.cloneObj();
+		m_oActionDefinitionFactory = actionDefinitionFactory;
+		m_oMsgFactory = MessageFactory.getInstance();
 
-                    currentAction = action.trim();
-                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
-                    if(actionDefinition == null) {
-                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
-                    }
-                    
-                    // The processing result of each action feeds into the processing of the next action...
-                    currentProcessor = actionDefinition.getProcessor();
-                    try {
-                    	ActionUtils.copyCurrentToPrevious(message);
-                        message = currentProcessor.process(message);
-                    } catch (Exception e) {
-                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
-                        throw e;
-                    }
-                    
-                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
-                        logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
-                        break;
-                    }
-                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
-                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-                    
-                    // Setup the message for processing by the next processor...
-                    message.getBody().remove(ActionUtils.BEFORE_ACTION);
-                }
-            } catch(Throwable thrown) {
-                processingError(initialObject, currentProcessor, thrown);
-                logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            }
-            
-            processingComplete(initialObject);
-            
-            // Decrement the active thread count for the listener on completion...
-            decThreads();
-        }
-    }
-    
+		String sAtt = GpListener.obtainAtt(listenerConfig,
+				GpListener.PARM_ACTIONS, "");
+		m_oActions = sAtt.split(",");
+
+		if (m_oActions.length == 0) {
+			throw new ConfigurationException(
+					"Listener 'actions' list must be specified.");
+		}
+
+		sAtt = GpListener.obtainAtt(listenerConfig,
+				GpListener.PARM_MAX_THREADS, "1");
+		int iMax = Integer.parseInt(sAtt);
+		int m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+
+		this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
+	} // __________________________________
+
+	/**
+	 * Implement run method for this Runnable <p/> Will continue to run until
+	 * controlling class (ref in m_oDad) indicates no more looping allowed for
+	 * all child classes <p/> This condition will not prevent child processes to
+	 * finish normally
+	 */
+
+	public void run() {
+		// No need to do something in the passive listeners
+	}
+
+	/**
+	 * Called on the listener implementation when pipeline processing error has
+	 * occured.
+	 * 
+	 * @param initialObject
+	 *            The object reference that was initialy supplied to the
+	 *            pipeline.
+	 * @param processor
+	 *            The processor raised the error.
+	 * @param error
+	 *            The error.
+	 */
+	protected abstract void processingError(Object initialObject,
+			ActionProcessor processor, Throwable error);
+
+	/**
+	 * Called on the listener implementation when pipeline processing of a
+	 * message is complete.
+	 * 
+	 * @param initialObject
+	 *            The object reference that was initialy supplied to the
+	 *            pipeline.
+	 */
+	protected abstract void processingComplete(Object initialObject);
+
+	/**
+	 * Action Processing Pipeline. <p/> Runs the actions in a listeners
+	 * "actions" config on a message payload message received by the listener
+	 * implementation.
+	 * 
+	 * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+	 * @since Version 4.0
+	 */
+	protected class ActionProcessingPipeline implements Callable {
+
+		private Object initialObject;
+
+		/**
+		 * Private constructor.
+		 * 
+		 * @param pMessage
+		 *            The inital processing target message.
+		 */
+		protected ActionProcessingPipeline(Object obj) {
+			initialObject = obj;
+		}
+
+		/*
+		 * (non-Javadoc)
+		 * 
+		 * @see java.lang.Runnable#run()
+		 */
+		public Object call() {
+			String currentAction = null;
+			ActionProcessor currentProcessor = null;
+			Message message = m_oMsgFactory.getMessage();
+			try {
+
+				ActionUtils.putCurrentObject(message, initialObject);
+
+				// Run the message through each ActionProcessor...
+				for (String action : m_oActions) {
+					ActionDefinition actionDefinition;
+
+					currentAction = action.trim();
+					actionDefinition = m_oActionDefinitionFactory
+							.getInstance(currentAction);
+					if (actionDefinition == null) {
+						throw new java.lang.IllegalStateException(
+								"Bad Listener Configuration.  No 'Actions/Action' definition for action ["
+										+ currentAction + "].");
+					}
+
+					// The processing result of each action feeds into the
+					// processing of the next action...
+					currentProcessor = actionDefinition.getProcessor();
+					try {
+						ActionUtils.copyCurrentToPrevious(message);
+						message = currentProcessor.process(message);
+					} catch (Exception e) {
+						GpListener.notifyError(listenerConfig, e,
+								currentProcessor.getErrorNotification(message));
+						throw e;
+					}
+
+					if (message == null
+							&& action != m_oActions[m_oActions.length - 1]) {
+						logger
+								.warn("Premature termination of action processing pipeline ["
+										+ Arrays.asList(m_oActions)
+										+ "].  ActionProcessor ["
+										+ currentProcessor.getClass().getName()
+										+ "] returned a null message result on processing of action ["
+										+ currentAction + "].");
+						break;
+					}
+					// Notify on all processors. May want to do this differently
+					// in the future i.e. more selectively ...
+					GpListener.notifyOK(listenerConfig, currentProcessor
+							.getOkNotification(message));
+
+					// Setup the message for processing by the next processor...
+				}
+			} catch (Throwable thrown) {
+				processingError(initialObject, currentProcessor, thrown);
+				logger.error(
+						"Premature termination of action processing pipeline ["
+								+ Arrays.asList(m_oActions) + "].  Action ["
+								+ currentAction + "] threw an exception.",
+						thrown);
+			}
+
+			processingComplete(initialObject);
+
+			// return the result of the last action in the pipeline
+			return ActionUtils.currentFromMessage(message);
+
+		}
+	}
+
 } // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-10-07 21:15:39 UTC (rev 6681)
@@ -36,7 +36,7 @@
  * 
  * @author Esteban
  */
-public abstract class AbstractPoller extends AbstractListener {
+public abstract class AbstractPoller extends AbstractActiveListener {
 
     // You can override these values at constructor time of your
     // derived class after calling super(GpListener,DomElement)

Modified: labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-10-07 20:53:52 UTC (rev 6680)
+++ labs/jbossesb/workspace/jokum/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-10-07 21:15:39 UTC (rev 6681)
@@ -39,7 +39,7 @@
 import org.jboss.soa.esb.helpers.AppServerContext;
 import org.jboss.soa.esb.helpers.DomElement;
 
-public class JmsQueueListener extends AbstractListener {
+public class JmsQueueListener extends AbstractActiveListener {
 
     public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
     public static final String LISTEN_JNDI_TYPE = "listenJndiType";
@@ -53,6 +53,9 @@
     protected Queue m_oQueue;
     protected String m_sSelector;
 
+    //TODO refactor to config setting
+    protected long reconnectionInterval = 2000;
+
     protected MessageConsumer jmsMessageReceiver;
 
 
@@ -120,7 +123,7 @@
                     catch (Exception e) {
                         logger.error("Reconnecting to Queue", e);
                         try {
-                            Thread.sleep(m_iSleepForThreads);
+                            Thread.sleep(this.reconnectionInterval);
                         } catch (InterruptedException e1) { // Just return
                             logger.error("Unexpected thread interupt exception.", e);
                             return null;




More information about the jboss-svn-commits mailing list