[jboss-svn-commits] JBL Code SVN: r7344 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Nov 2 14:52:12 EST 2006


Author: jokum
Date: 2006-11-02 14:52:11 -0500 (Thu, 02 Nov 2006)
New Revision: 7344

Removed:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
AbstractListenerUnitTest was failing after refactoring to thread pool

Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 19:51:22 UTC (rev 7343)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java	2006-11-02 19:52:11 UTC (rev 7344)
@@ -1,283 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.soa.esb.listeners.old;
-
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.log4j.Logger;
-import org.jboss.soa.esb.actions.ActionDefinition;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessingException;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.actions.ActionUtils;
-import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageFactory;
-
-/**
- * Base abstract listener implementation.
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
-public abstract class AbstractListener implements Runnable {   
-
-    /**
-	 * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
-	 * incomming message.  This allows the configured processing pipeline to be overridden by the Message
-	 * producer. 
-	 */
-	public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
-	
-    // You can override these values at constructor time of your
-    // derived class after calling super(GpListener,ConfigTree)
-    protected int m_iSleepForThreads = 3000; // default sleep if no threads available
-    protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
-
-    protected int 			m_iQthr = 0, m_iMaxThr;
-
-    protected ThreadGroup 	m_oThrGrp = null;
-    protected Logger 		logger;
-    protected GpListener 	m_oDad;
-    protected ConfigTree 	listenerConfig;
-    protected String[] 		m_oActions;
-    protected ActionDefinitionFactory m_oActionDefinitionFactory;
-    protected MessageFactory m_oMsgFactory;
-
-	/* The name of the attribute in the configuration containing the number of threads in the pool*/
-	public static final String PARM_MAX_THREADS = "maxThreads";
-
-	/* The thread pool executing the action processing pipeline*/
-	protected ExecutorService pipelineExecutorPool = null;
-
-    protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-        
-        logger 		= Logger.getLogger(this.getClass());
-        m_oDad 		= p_oDad;
-        listenerConfig = p_oParms.cloneObj();
-        m_oActionDefinitionFactory = actionDefinitionFactory;
-        m_oMsgFactory = MessageFactory.getInstance();
-        m_oThrGrp 	= new ThreadGroup(listenerConfig.getName());
-
-        String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
-        if(!sAtt.trim().equals("")) {
-        	m_oActions 	= sAtt.split(",");
-        }    
-        
-        //The number of threads configured
-		String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
-		if (maxThreads != null) {
-			int iMax = Integer.parseInt(maxThreads);
-			this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-			if (this.logger.isInfoEnabled()){
-				this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
-			}			
-		} else {
-			this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
-		}
-		this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
-        
-    } // __________________________________
-
-    /**
-     * Implement run method for this Runnable <p/> Will continue to run until
-     * controlling class (ref in m_oDad) indicates no more looping allowed for
-     * all child classes <p/> This condition will not prevent child processes to
-     * finish normally
-     */
-    public void run() {
-        while (m_oDad.continueLooping()) {
-            Object[] processList = receive();
-            if (null==processList) {
-            	try { Thread.sleep(500); }
-            	catch(InterruptedException e) {/*  ok  do nothing  */}
-            } else {
-              for (Object currentObj : processList) {              
-                // Spawn a thread and push the message message through the pipeline...
-                ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
-                this.pipelineExecutorPool.submit(runner);
-              }
-            }
-        }
-        
-        // Wait for all the processing pipelines to complete before closing the listener and existing...
-        while(m_iQthr > 0) {
-        	logger.info("Waiting for all processing pipelines to complete.");
-        	try {
-				Thread.sleep(200);
-			} catch (InterruptedException e) {
-	        	logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
-			}
-        }
-
-    	logger.info("All processing pipelines complete. Closing listener now.");
-        
-        close();
-    }
-    
-    /**
-     * Receive message from underlying channel implementation.
-     * <p/>
-     * Implementations must perform a blocking receive.
-     * @return An array of Objects received on the channel.
-     */
-    protected abstract Object[] receive();
-
-    /**
-     * Called on the listener implementation when pipeline processing error has occured.
-     * @param initialMsg The message that was initialy supplied to the pipeline.
-     * @param processor The processor that raised the error.  Can be null where the error was raised before
-     * pipeline processing of the message.
-     * @param error The error.  Can be null.
-     */
-    protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
-
-    /**
-     * Called on the listener implementation when pipeline processing of a message is complete.
-     * @param initialMsg The message that was initialy supplied to the pipeline.
-     */
-    protected abstract void processingComplete(Object initialMsg);
-    
-    /**
-     * Close the listener implemenation.
-     * <p/>
-     * Allows the listener to perform relevant close/cleanup tasks.
-     */
-    protected abstract void close();      
-    
-    /**
-     * Action Processing Pipeline.
-     * <p/>
-     * Runs the actions in a listeners "actions" config on a message payload message received
-     * by the listener implementation.
-     * 
-     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
-     * @since Version 4.0
-     */
-    protected class ActionProcessingPipeline implements Runnable {
-        
-		private Object initialObject;
-             
-        /**
-         * Private constructor.
-         * @param pMessage The inital processing target message.
-         */
-        protected ActionProcessingPipeline(Object obj) {
-            initialObject = obj;
-        }
-
-        /* (non-Javadoc)
-         * @see java.lang.Runnable#run()
-         */
-        public void run() {
-            String currentAction = null;
-            ActionProcessor currentProcessor = null;
-            
-            try {
-                Message message;
-                String[] actions;
-                
-                if(initialObject instanceof Message) {
-                	message = (Message)initialObject;
-                } else {
-	                message = m_oMsgFactory.getMessage();
-	                ActionUtils.setTaskObject(message,initialObject);
-                }
-                
-                actions = getActions(message);
-                
-                // Run the message through each ActionProcessor...
-                for(String action : actions) {
-                    ActionDefinition actionDefinition;
-
-                    currentAction = action.trim();
-                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
-                    if(actionDefinition == null) {
-                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
-                    }
-                    
-                    // The processing result of each action feeds into the processing of the next action...
-                    currentProcessor = actionDefinition.getProcessor();
-                    try {
-                    	ActionUtils.copyCurrentToPrevious(message);
-                        message = currentProcessor.process(message);
-                    } catch (Exception e) {
-                        GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
-                        throw e;
-                    }
-                    
-                    if(message == null && action != m_oActions[m_oActions.length - 1]) {
-                    	String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "].  ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
-                        processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
-                        logger.warn(exceptionMessage);
-                        return;
-                    }
-                    // Notify on all processors.  May want to do this differently in the future i.e. more selectively ...
-                    GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-                    
-                    // Setup the message for processing by the next processor...
-                    if(message != null) {
-                    	message.getBody().remove(ActionUtils.BEFORE_ACTION);
-                    }
-                }
-                
-                processingComplete(initialObject);
-            } catch(Throwable thrown) {
-                processingError(initialObject, currentProcessor, thrown);
-                logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "].  Action [" + currentAction + "] threw an exception.", thrown);
-            } 
-        }
-
-		/**
-		 * Get the list of actions to be applied to the supplied message.
-		 * @param message The message to be processed.
-		 * @return The set of processing actions to be performed on the message. 
-		 * @throws ActionProcessingException Invalid actions list attachment setting.
-		 */
-		private String[] getActions(Message message) throws ActionProcessingException {
-			// Check is there an attachment specifying an override pipeline config...
-			Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
-			if(overrideActionsAttachment != null) {
-				if(overrideActionsAttachment instanceof String) {
-					String overrideActions = (String)overrideActionsAttachment;
-					
-					if(overrideActions.trim().equals("")) {
-			        	throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value.  Aborting message processing.");
-					}
-					
-					return overrideActions.split(",");
-				} else {
-					throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String.  Received [" + overrideActionsAttachment.getClass().getName() + "].  Aborting message processing.");
-				}
-			} else {
-				// Otherwise use the actions configured on the listener...
-				if(m_oActions == null || m_oActions.length == 0) {
-					throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "].  Aborting message processing.");
-				}
-				return m_oActions;
-			}
-		}
-    }
-    
-} // ____________________________________________________________________________

Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 19:51:22 UTC (rev 7343)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java	2006-11-02 19:52:11 UTC (rev 7344)
@@ -1,143 +0,0 @@
-package org.jboss.soa.esb.listeners.old;
-
-import javax.management.MBeanServer;
-
-import org.jboss.remoting.InvocationRequest;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.ServerInvocationHandler;
-import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.transport.Connector;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.helpers.ConfigTree;
-
-/**
- * 
- * 
- * @author Johan Kumps
- * 
- */
-public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
-
-	/* The url to listen on */
-	public String listenHttpUrl = null;
-
-	/* The url this listener will listen on */
-	private static final String LISTEN_HTTP_URL = "listenHttpURL";
-
-	/* The default transport this listener will listen on */
-	private static final String transport = "http";
-
-	/* The default hostname this listener will listen on */
-	private static final String host = "localhost";
-
-	/* The default port this listener will listen on */
-	private static final int port = 5400;
-
-	public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
-	  {
-		super(p_oDad, p_oParms, actionDefinitionFactory);
-		this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
-		// initialize the HTTP server
-		this.initServer();
-	  }
-
-	/**
-	 * Check for mandatory and optional attributes in parameter tree
-	 * 
-	 * @throws Exception -
-	 *             if actionClass not specified or not in classpath or invalid
-	 *             int values for maxThreads or pollLatencySecs
-	 * 
-	 */
-	protected void checkParams() throws Exception {
-		// listener url
-		this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
-				this.getDefaultListenHttpUrl());
-	}
-	
-	public Object invoke(InvocationRequest invocationRequest) throws Throwable {
-		//Retrieving the real payload of this invocationRequest
-		Object payload = invocationRequest.getParameter();
-
-		if (this.logger.isInfoEnabled()) {
-			this.logger
-					.info("HttpInvocationListener is invoked...The given payload is : "
-							+ payload);
-		}
-		//Start the action processing pipeline
-		ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
-		this.pipelineExecutorPool.submit(pipelineRunner);
-		
-		return payload;
-	}
-
-	/**
-	 * Method returning the default listenHttpUrl for this HttpListener instance
-	 * 
-	 * @return the default listen url
-	 */
-	private String getDefaultListenHttpUrl() {
-		return HttpListener.transport + "://" + HttpListener.host + ":"
-				+ HttpListener.port;
-	}
-
-	private void initServer() throws Exception {
-		InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
-		if (this.logger.isInfoEnabled()) {
-			this.logger.info("Starting remoting server with locator uri of: "
-					+ this.listenHttpUrl);
-		}
-		Connector connector = new Connector(locator);
-		connector.create();
-		connector.addInvocationHandler("HttpInvocationHandler", this);
-
-		// Starting the server deamon
-		connector.start();
-
-		if (this.logger.isInfoEnabled()) {
-			this.logger.info("HttpListener deamon started successfully!");
-		}		
-
-	}
-
-	@Override
-	protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	protected void processingComplete(Object initialMsg) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	@Override
-	protected void close() {
-		// TODO Auto-generated method stub
-		
-	}
-
-	public void setMBeanServer(MBeanServer arg0) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	public void setInvoker(ServerInvoker arg0) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	public void addListener(InvokerCallbackHandler arg0) {
-		// TODO Auto-generated method stub
-		
-	}
-
-	public void removeListener(InvokerCallbackHandler arg0) {
-		// TODO Auto-generated method stub
-		
-	}
-
-}




More information about the jboss-svn-commits mailing list