[jboss-svn-commits] JBL Code SVN: r7344 - labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Nov 2 14:52:12 EST 2006
Author: jokum
Date: 2006-11-02 14:52:11 -0500 (Thu, 02 Nov 2006)
New Revision: 7344
Removed:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
Log:
AbstractListenerUnitTest was failing after refactoring to thread pool
Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 19:51:22 UTC (rev 7343)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/AbstractListener.java 2006-11-02 19:52:11 UTC (rev 7344)
@@ -1,283 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.soa.esb.listeners.old;
-
-import java.util.Arrays;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.log4j.Logger;
-import org.jboss.soa.esb.actions.ActionDefinition;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessingException;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.actions.ActionUtils;
-import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageFactory;
-
-/**
- * Base abstract listener implementation.
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
-public abstract class AbstractListener implements Runnable {
-
- /**
- * Name constant def for the Message attachemnt carrying the list of actions to be applied to the
- * incomming message. This allows the configured processing pipeline to be overridden by the Message
- * producer.
- */
- public static final String MESSAGE_PROCESSING_ACTIONS_LIST = "MESSAGE_PROCESSING_ACTIONS_LIST";
-
- // You can override these values at constructor time of your
- // derived class after calling super(GpListener,ConfigTree)
- protected int m_iSleepForThreads = 3000; // default sleep if no threads available
- protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
-
- protected int m_iQthr = 0, m_iMaxThr;
-
- protected ThreadGroup m_oThrGrp = null;
- protected Logger logger;
- protected GpListener m_oDad;
- protected ConfigTree listenerConfig;
- protected String[] m_oActions;
- protected ActionDefinitionFactory m_oActionDefinitionFactory;
- protected MessageFactory m_oMsgFactory;
-
- /* The name of the attribute in the configuration containing the number of threads in the pool*/
- public static final String PARM_MAX_THREADS = "maxThreads";
-
- /* The thread pool executing the action processing pipeline*/
- protected ExecutorService pipelineExecutorPool = null;
-
- protected AbstractListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
-
- logger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- listenerConfig = p_oParms.cloneObj();
- m_oActionDefinitionFactory = actionDefinitionFactory;
- m_oMsgFactory = MessageFactory.getInstance();
- m_oThrGrp = new ThreadGroup(listenerConfig.getName());
-
- String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
- if(!sAtt.trim().equals("")) {
- m_oActions = sAtt.split(",");
- }
-
- //The number of threads configured
- String maxThreads = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
- if (maxThreads != null) {
- int iMax = Integer.parseInt(maxThreads);
- this.m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
- if (this.logger.isInfoEnabled()){
- this.logger.info("Action processing pipeline will be handled by " + m_iMaxThr + " threads.");
- }
- } else {
- this.logger.warn("Attribute maxThreads has not been set. Action pipeline will be processed by only one thread");
- }
- this.pipelineExecutorPool = Executors.newFixedThreadPool(m_iMaxThr);
-
- } // __________________________________
-
- /**
- * Implement run method for this Runnable <p/> Will continue to run until
- * controlling class (ref in m_oDad) indicates no more looping allowed for
- * all child classes <p/> This condition will not prevent child processes to
- * finish normally
- */
- public void run() {
- while (m_oDad.continueLooping()) {
- Object[] processList = receive();
- if (null==processList) {
- try { Thread.sleep(500); }
- catch(InterruptedException e) {/* ok do nothing */}
- } else {
- for (Object currentObj : processList) {
- // Spawn a thread and push the message message through the pipeline...
- ActionProcessingPipeline runner = new ActionProcessingPipeline(currentObj);
- this.pipelineExecutorPool.submit(runner);
- }
- }
- }
-
- // Wait for all the processing pipelines to complete before closing the listener and existing...
- while(m_iQthr > 0) {
- logger.info("Waiting for all processing pipelines to complete.");
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- logger.warn("Thread interrupted while waiting for all processing pipelines to complete.", e);
- }
- }
-
- logger.info("All processing pipelines complete. Closing listener now.");
-
- close();
- }
-
- /**
- * Receive message from underlying channel implementation.
- * <p/>
- * Implementations must perform a blocking receive.
- * @return An array of Objects received on the channel.
- */
- protected abstract Object[] receive();
-
- /**
- * Called on the listener implementation when pipeline processing error has occured.
- * @param initialMsg The message that was initialy supplied to the pipeline.
- * @param processor The processor that raised the error. Can be null where the error was raised before
- * pipeline processing of the message.
- * @param error The error. Can be null.
- */
- protected abstract void processingError(Object initialMsg, ActionProcessor processor, Throwable error);
-
- /**
- * Called on the listener implementation when pipeline processing of a message is complete.
- * @param initialMsg The message that was initialy supplied to the pipeline.
- */
- protected abstract void processingComplete(Object initialMsg);
-
- /**
- * Close the listener implemenation.
- * <p/>
- * Allows the listener to perform relevant close/cleanup tasks.
- */
- protected abstract void close();
-
- /**
- * Action Processing Pipeline.
- * <p/>
- * Runs the actions in a listeners "actions" config on a message payload message received
- * by the listener implementation.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @since Version 4.0
- */
- protected class ActionProcessingPipeline implements Runnable {
-
- private Object initialObject;
-
- /**
- * Private constructor.
- * @param pMessage The inital processing target message.
- */
- protected ActionProcessingPipeline(Object obj) {
- initialObject = obj;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Runnable#run()
- */
- public void run() {
- String currentAction = null;
- ActionProcessor currentProcessor = null;
-
- try {
- Message message;
- String[] actions;
-
- if(initialObject instanceof Message) {
- message = (Message)initialObject;
- } else {
- message = m_oMsgFactory.getMessage();
- ActionUtils.setTaskObject(message,initialObject);
- }
-
- actions = getActions(message);
-
- // Run the message through each ActionProcessor...
- for(String action : actions) {
- ActionDefinition actionDefinition;
-
- currentAction = action.trim();
- actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
- if(actionDefinition == null) {
- throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
- }
-
- // The processing result of each action feeds into the processing of the next action...
- currentProcessor = actionDefinition.getProcessor();
- try {
- ActionUtils.copyCurrentToPrevious(message);
- message = currentProcessor.process(message);
- } catch (Exception e) {
- GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
- throw e;
- }
-
- if(message == null && action != m_oActions[m_oActions.length - 1]) {
- String exceptionMessage = "Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].";
- processingError(initialObject, currentProcessor, new ActionProcessingException(exceptionMessage));
- logger.warn(exceptionMessage);
- return;
- }
- // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
- GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
-
- // Setup the message for processing by the next processor...
- if(message != null) {
- message.getBody().remove(ActionUtils.BEFORE_ACTION);
- }
- }
-
- processingComplete(initialObject);
- } catch(Throwable thrown) {
- processingError(initialObject, currentProcessor, thrown);
- logger.error("Premature termination of action processing pipeline [" + (m_oActions != null?Arrays.asList(m_oActions):"") + "]. Action [" + currentAction + "] threw an exception.", thrown);
- }
- }
-
- /**
- * Get the list of actions to be applied to the supplied message.
- * @param message The message to be processed.
- * @return The set of processing actions to be performed on the message.
- * @throws ActionProcessingException Invalid actions list attachment setting.
- */
- private String[] getActions(Message message) throws ActionProcessingException {
- // Check is there an attachment specifying an override pipeline config...
- Object overrideActionsAttachment = message.getAttachment().get(MESSAGE_PROCESSING_ACTIONS_LIST);
- if(overrideActionsAttachment != null) {
- if(overrideActionsAttachment instanceof String) {
- String overrideActions = (String)overrideActionsAttachment;
-
- if(overrideActions.trim().equals("")) {
- throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] was specified but with an empty value. Aborting message processing.");
- }
-
- return overrideActions.split(",");
- } else {
- throw new ActionProcessingException("Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "] must be of type java.lang.String. Received [" + overrideActionsAttachment.getClass().getName() + "]. Aborting message processing.");
- }
- } else {
- // Otherwise use the actions configured on the listener...
- if(m_oActions == null || m_oActions.length == 0) {
- throw new ActionProcessingException("No actions configuration specified either on the listener or as a Message attachement [" + MESSAGE_PROCESSING_ACTIONS_LIST + "]. Aborting message processing.");
- }
- return m_oActions;
- }
- }
- }
-
-} // ____________________________________________________________________________
Deleted: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 19:51:22 UTC (rev 7343)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/old/HttpListener.java 2006-11-02 19:52:11 UTC (rev 7344)
@@ -1,143 +0,0 @@
-package org.jboss.soa.esb.listeners.old;
-
-import javax.management.MBeanServer;
-
-import org.jboss.remoting.InvocationRequest;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.ServerInvocationHandler;
-import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.transport.Connector;
-import org.jboss.soa.esb.actions.ActionDefinitionFactory;
-import org.jboss.soa.esb.actions.ActionProcessor;
-import org.jboss.soa.esb.helpers.ConfigTree;
-
-/**
- *
- *
- * @author Johan Kumps
- *
- */
-public class HttpListener extends AbstractPassiveListener implements ServerInvocationHandler{
-
- /* The url to listen on */
- public String listenHttpUrl = null;
-
- /* The url this listener will listen on */
- private static final String LISTEN_HTTP_URL = "listenHttpURL";
-
- /* The default transport this listener will listen on */
- private static final String transport = "http";
-
- /* The default hostname this listener will listen on */
- private static final String host = "localhost";
-
- /* The default port this listener will listen on */
- private static final int port = 5400;
-
- public HttpListener(GpListener p_oDad, ConfigTree p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
- {
- super(p_oDad, p_oParms, actionDefinitionFactory);
- this.listenHttpUrl = GpListener.obtainAtt(p_oParms,LISTEN_HTTP_URL, this.getDefaultListenHttpUrl());
- // initialize the HTTP server
- this.initServer();
- }
-
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception -
- * if actionClass not specified or not in classpath or invalid
- * int values for maxThreads or pollLatencySecs
- *
- */
- protected void checkParams() throws Exception {
- // listener url
- this.listenHttpUrl = GpListener.obtainAtt(this.listenerConfig, LISTEN_HTTP_URL,
- this.getDefaultListenHttpUrl());
- }
-
- public Object invoke(InvocationRequest invocationRequest) throws Throwable {
- //Retrieving the real payload of this invocationRequest
- Object payload = invocationRequest.getParameter();
-
- if (this.logger.isInfoEnabled()) {
- this.logger
- .info("HttpInvocationListener is invoked...The given payload is : "
- + payload);
- }
- //Start the action processing pipeline
- ActionProcessingPipeline pipelineRunner = new ActionProcessingPipeline(payload);
- this.pipelineExecutorPool.submit(pipelineRunner);
-
- return payload;
- }
-
- /**
- * Method returning the default listenHttpUrl for this HttpListener instance
- *
- * @return the default listen url
- */
- private String getDefaultListenHttpUrl() {
- return HttpListener.transport + "://" + HttpListener.host + ":"
- + HttpListener.port;
- }
-
- private void initServer() throws Exception {
- InvokerLocator locator = new InvokerLocator(this.listenHttpUrl);
- if (this.logger.isInfoEnabled()) {
- this.logger.info("Starting remoting server with locator uri of: "
- + this.listenHttpUrl);
- }
- Connector connector = new Connector(locator);
- connector.create();
- connector.addInvocationHandler("HttpInvocationHandler", this);
-
- // Starting the server deamon
- connector.start();
-
- if (this.logger.isInfoEnabled()) {
- this.logger.info("HttpListener deamon started successfully!");
- }
-
- }
-
- @Override
- protected void processingError(Object initialMsg, ActionProcessor processor, Throwable error) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void processingComplete(Object initialMsg) {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- protected void close() {
- // TODO Auto-generated method stub
-
- }
-
- public void setMBeanServer(MBeanServer arg0) {
- // TODO Auto-generated method stub
-
- }
-
- public void setInvoker(ServerInvoker arg0) {
- // TODO Auto-generated method stub
-
- }
-
- public void addListener(InvokerCallbackHandler arg0) {
- // TODO Auto-generated method stub
-
- }
-
- public void removeListener(InvokerCallbackHandler arg0) {
- // TODO Auto-generated method stub
-
- }
-
-}
More information about the jboss-svn-commits
mailing list