[jboss-svn-commits] JBL Code SVN: r11547 - in labs/jbossesb/trunk/product: core/listeners/src/org/jboss/soa/esb/listeners/message and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue May 1 07:26:40 EDT 2007


Author: mark.little at jboss.com
Date: 2007-05-01 07:26:40 -0400 (Tue, 01 May 2007)
New Revision: 11547

Added:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingFaultException.java
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/errors/
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
Modified:
   labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
   labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
Log:


Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingFaultException.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingFaultException.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingFaultException.java	2007-05-01 11:26:40 UTC (rev 11547)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.message.Message;
+
+/**
+ * Exception while processing message payload processing action. Can be
+ * used to return an application specific fault message and break out
+ * of action chaining loop.
+ * 
+ * @author marklittle
+ * @since Version 4.2MR2
+ */
+public class ActionProcessingFaultException extends ActionProcessingException
+{
+
+	private static final long serialVersionUID = 1L;
+
+	public ActionProcessingFaultException (Message faultMessage, String cause)
+	{
+		this(cause);
+		
+		_message = faultMessage;
+	}
+	
+	/**
+	 * Public constructor.
+	 * 
+	 * @param message
+	 *            Exception message.
+	 */
+	public ActionProcessingFaultException(String message)
+	{
+		super(message);
+	}
+
+	/**
+	 * Public constructor.
+	 * 
+	 * @param message
+	 *            Exception message.
+	 * @param cause
+	 *            Exception cause.
+	 */
+	public ActionProcessingFaultException(String message, Throwable cause)
+	{
+		super(message, cause);
+	}
+
+	/**
+	 * Public constructor.
+	 * 
+	 * @param cause
+	 *            Exception cause.
+	 */
+	public ActionProcessingFaultException(Throwable cause)
+	{
+		super(cause);
+	}
+
+	public final Message getFaultMessage ()
+	{
+		return _message;
+	}
+	
+	private Message _message;
+}

Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-05-01 10:59:57 UTC (rev 11546)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-05-01 11:26:40 UTC (rev 11547)
@@ -27,8 +27,11 @@
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.SendFailedException;
 import org.jboss.soa.esb.actions.ActionLifecycle;
 import org.jboss.soa.esb.actions.ActionPipelineProcessor;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.actions.ActionProcessingFaultException;
 import org.jboss.soa.esb.actions.BeanConfiguredAction;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
@@ -38,304 +41,499 @@
 import org.jboss.soa.esb.couriers.CourierUtil;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.message.errors.Factory;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.util.ClassUtil;
 
-
 /**
- * Action Processing Pipeline.
- * <p/>
- * Runs a list of action classes on a message
+ * Action Processing Pipeline. <p/> Runs a list of action classes on a message
  * 
- * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @author <a
+ *         href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
  * @author kevin
  * @since Version 4.0
  */
 public class ActionProcessingPipeline
 {
-    /**
-     * The logger instance.
-     */
-    private final static Logger LOGGER = Logger.getLogger(ActionProcessingPipeline.class) ;
-    
-    /**
-     * The processors.
-     */
-    private final ActionPipelineProcessor[] processors ;
-    /**
-     * The active flag.
-     */
-    private final AtomicBoolean active = new AtomicBoolean(false) ;
+	/**
+	 * The logger instance.
+	 */
+	private final static Logger LOGGER = Logger
+			.getLogger(ActionProcessingPipeline.class);
 
-    /**
-     * public constructor
-     * @param config The pipeline configuration.
-     */
-    public ActionProcessingPipeline(final ConfigTree config)
-    	throws ConfigurationException
-    {
-    	if (config == null)
-        {
-    	    throw new IllegalArgumentException("Configuration needed for action classes") ;
-        }
-        
-    	final ConfigTree[] actionList = config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG) ;
-        
-    	if ((actionList == null) || (actionList.length == 0))
-        {
-            throw new ConfigurationException("No actions in list") ;
-        }
-        
-        final ArrayList<ActionPipelineProcessor> processorList = new ArrayList<ActionPipelineProcessor>() ;
-        
-        for(final ConfigTree actionConfig: actionList)
-        {
-            final String actionClassTag = actionConfig.getAttribute(ListenerTagNames.ACTION_CLASS_TAG) ;
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("Registering action class " + actionClassTag) ;
-            }
-            final Class actionClass ;
-            try
-            {
-                actionClass = ClassUtil.forName(actionClassTag, getClass()) ;
-            }
-            catch (final ClassNotFoundException cnfe)
-            {
-                throw new ConfigurationException("Could not load action class " + actionClassTag) ;
-            }
-            
-            final ActionPipelineProcessor processor ;
-            if (BeanConfiguredAction.class.isAssignableFrom(actionClass))
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Using bean configured action processor for " + actionClassTag) ;
-                }
-                processor = new BeanConfigActionProcessor(actionConfig, actionClass) ;
-            } 
-            else if (ActionPipelineProcessor.class.isAssignableFrom(actionClass))
-            {
-                final ActionPipelineProcessor currentProcessor = (ActionPipelineProcessor)ActionProcessorMethodInfo.getActionClassInstance(actionConfig, actionClass) ;
-                if (ActionProcessorMethodInfo.checkOverridden(actionConfig))
-                {
-                    if (LOGGER.isDebugEnabled())
-                    {
-                        LOGGER.debug("Using overridden action pipeline processor for " + actionClassTag) ;
-                    }
-                    processor = new OverriddenActionPipelineProcessor(actionConfig, currentProcessor) ;
-                }
-                else
-                {
-                    if (LOGGER.isDebugEnabled())
-                    {
-                        LOGGER.debug("Using normal action pipeline processor for " + actionClassTag) ;
-                    }
-                    processor = currentProcessor ;
-                }
-            }
-            else if (ActionLifecycle.class.isAssignableFrom(actionClass))
-            {
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Using overridden action lifecycle processor for " + actionClassTag) ;
-                }
-                final ActionLifecycle currentLifecycle = (ActionLifecycle)ActionProcessorMethodInfo.getActionClassInstance(actionConfig, actionClass) ;
-                processor = new OverriddenActionLifecycleProcessor(actionConfig, currentLifecycle) ;
-            }
-            else
-            {
-                LOGGER.warn("Action class " + actionClassTag + " does not implement the ActionLifecycle interface") ;
-                if (LOGGER.isDebugEnabled())
-                {
-                    LOGGER.debug("Using overridden actions processor for " + actionClassTag) ;
-                }
-                processor = new OverriddenActionProcessor(actionConfig, actionClass) ;
-            }
-            processorList.add(processor) ;
-        }
-        processors = processorList.toArray(new ActionPipelineProcessor[processorList.size()]) ;
-    }
-    
-    /**
-     * Handle the initialisation of the pipeline
-     * @throws ConfigurationException For errors during initialisation.
-     */
-    public void initialise()
-        throws ConfigurationException
-    {
-        final int numLifecycles = processors.length ;
-        for(int count = 0 ; count < numLifecycles ; count++)
-        {
-            final ActionLifecycle lifecycle = processors[count] ;
-            try
-            {
-                lifecycle.initialise() ;
-            }
-            catch (final Exception ex)
-            {
-                handleDestroy(count-1) ;
-                throw new ConfigurationException("Unexpected exception during lifecycle initialisation", ex) ;
-            }
-        }
-        active.set(true) ;
-    }
-    
-    /**
-     * Handle the destruction of the pipeline
-     */
-    public void destroy()
-    {
-        active.set(false) ;
-        handleDestroy(processors.length-1) ;
-    }
-    
-    /**
-     * Process the specified message.
-     * @param message The current message.
-     * @return true if the processing was successful, false otherwise.
-     */
-    public boolean process(final Message message)
-    {
-        if (active.get())
-        {
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("pipeline process for message") ;
-            }
-            
-            final int numProcessors = processors.length ;
-            final Message[] messages = new Message[numProcessors] ;
-            final EPR replyToAddress = getReplyToAddress(message);
+	/**
+	 * The processors.
+	 */
+	private final ActionPipelineProcessor[] processors;
 
-            Message currentMessage = message ;
-            
-            for(int count = 0 ; count < numProcessors ; count++)
-            {
-                final ActionPipelineProcessor processor = processors[count] ;
-                messages[count] = currentMessage ;
-                try
-                {
-                    LOGGER.debug("executing processor " + count) ;
-                    currentMessage = processor.process(currentMessage) ;
-                    if (currentMessage == null)
-                    {
-                        break ;
-                    }
-                }
-                catch (final Exception ex)
-                {
-                    LOGGER.warn("Unexpected exception caught while processing the action pipeline", ex) ;
-                    // TODO: If there's a replyToAddress, what should we reply??
-                    notifyException(count, ex, messages) ;
-                    return false ;
-                }
-            }
+	/**
+	 * The active flag.
+	 */
+	private final AtomicBoolean active = new AtomicBoolean(false);
 
-            // Reply...
-            if ((message != null) && (replyToAddress != null)) {
-                replyTo(replyToAddress, currentMessage);
-            }
-            
-            notifySuccess(messages) ;
-            return true ;
-        }
-        else
-        {
-            LOGGER.debug("pipeline process disabled for message") ;
-            return false ;
-        }
-    }
+	/**
+	 * public constructor
+	 * 
+	 * @param config
+	 *            The pipeline configuration.
+	 */
+	public ActionProcessingPipeline(final ConfigTree config)
+			throws ConfigurationException
+	{
+		if (config == null)
+		{
+			throw new IllegalArgumentException(
+					"Configuration needed for action classes");
+		}
 
-    private EPR getReplyToAddress(Message message) {
-        try {
-            return message.getHeader().getCall().getReplyTo();
-        } catch (NullPointerException e) {
-            // OK, it's not set... return null...
-        }
+		final ConfigTree[] actionList = config
+				.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
 
-        return null;
-    }
+		if ((actionList == null) || (actionList.length == 0))
+		{
+			throw new ConfigurationException("No actions in list");
+		}
 
-    private void replyTo(EPR replyToAddress, Message message) {
-        Courier courier = null;
-        try {
-            courier = CourierFactory.getCourier(replyToAddress);
-            courier.deliver(message);
-        } catch (CourierException e) {
-            LOGGER.error("Failed to reply to address " + replyToAddress + ".", e) ;
-        } catch (MalformedEPRException e) {
-            LOGGER.error("Failed to reply to address " + replyToAddress + ".", e) ;
-        } finally {
-            if(courier != null) {
-                CourierUtil.cleanCourier(courier);
-            }
-        }
-    }
+		final ArrayList<ActionPipelineProcessor> processorList = new ArrayList<ActionPipelineProcessor>();
 
-    /**
-     * Handle the destruction of the pipeline from the specified position.
-     * @param initialPosition The initial position to begin destruction. 
-     */
-    private void handleDestroy(final int initialPosition)
-    {
-        for(int count = initialPosition ; count >=0 ; count--)
-        {
-            final ActionLifecycle lifecycle = processors[count] ;
-            try
-            {
-                lifecycle.destroy() ;
-            }
-            catch (final Exception ex)
-            {
-                LOGGER.warn("Unexpected exception during lifecycle destruction", ex) ;
-            }
-        }
-    }
-    
-    /**
-     * Notify the processors of an error during processing.
-     * @param initialPosition The position of the first processor to notify.
-     * @param ex The exception which caused the failure.
-     * @param messages The messages associated with successful processors.
-     */
-    private void notifyException(final int initialPosition, final Exception ex, final Message[] messages)
-    {
-        for (int count = initialPosition ; count >=0 ; count--)
-        {
-            final ActionPipelineProcessor processor = processors[count] ;
-            try
-            {
-                processor.processException(messages[count], ex) ;
-            }
-            catch (final Exception ex2)
-            {
-                LOGGER.warn("Unexpected exception notifying processor of pipeline failure", ex2) ;
-            }
-        }
-    }
-    
-    /**
-     * Notify the processors of a successful pipeline process.
-     * @param messages The messages associated with the processors.
-     */
-    private void notifySuccess(final Message[] messages)
-    {
-        for (int count = messages.length - 1 ; count >=0 ; count--)
-        {
-            final Message message = messages[count] ;
-            if (message != null)
-            {
-                final ActionPipelineProcessor processor = processors[count] ;
-                try
-                {
-                    processor.processSuccess(messages[count]) ;
-                }
-                catch (final Exception ex)
-                {
-                    LOGGER.warn("Unexpected exception notifying processor of pipeline success", ex) ;
-                }
-            }
-        }
-    }
+		for (final ConfigTree actionConfig : actionList)
+		{
+			final String actionClassTag = actionConfig
+					.getAttribute(ListenerTagNames.ACTION_CLASS_TAG);
+			if (LOGGER.isDebugEnabled())
+			{
+				LOGGER.debug("Registering action class " + actionClassTag);
+			}
+			final Class actionClass;
+			try
+			{
+				actionClass = ClassUtil.forName(actionClassTag, getClass());
+			}
+			catch (final ClassNotFoundException cnfe)
+			{
+				throw new ConfigurationException("Could not load action class "
+						+ actionClassTag);
+			}
+
+			final ActionPipelineProcessor processor;
+			if (BeanConfiguredAction.class.isAssignableFrom(actionClass))
+			{
+				if (LOGGER.isDebugEnabled())
+				{
+					LOGGER.debug("Using bean configured action processor for "
+							+ actionClassTag);
+				}
+				processor = new BeanConfigActionProcessor(actionConfig,
+						actionClass);
+			}
+			else if (ActionPipelineProcessor.class
+					.isAssignableFrom(actionClass))
+			{
+				final ActionPipelineProcessor currentProcessor = (ActionPipelineProcessor) ActionProcessorMethodInfo
+						.getActionClassInstance(actionConfig, actionClass);
+				if (ActionProcessorMethodInfo.checkOverridden(actionConfig))
+				{
+					if (LOGGER.isDebugEnabled())
+					{
+						LOGGER
+								.debug("Using overridden action pipeline processor for "
+										+ actionClassTag);
+					}
+					processor = new OverriddenActionPipelineProcessor(
+							actionConfig, currentProcessor);
+				}
+				else
+				{
+					if (LOGGER.isDebugEnabled())
+					{
+						LOGGER
+								.debug("Using normal action pipeline processor for "
+										+ actionClassTag);
+					}
+					processor = currentProcessor;
+				}
+			}
+			else if (ActionLifecycle.class.isAssignableFrom(actionClass))
+			{
+				if (LOGGER.isDebugEnabled())
+				{
+					LOGGER
+							.debug("Using overridden action lifecycle processor for "
+									+ actionClassTag);
+				}
+				final ActionLifecycle currentLifecycle = (ActionLifecycle) ActionProcessorMethodInfo
+						.getActionClassInstance(actionConfig, actionClass);
+				processor = new OverriddenActionLifecycleProcessor(
+						actionConfig, currentLifecycle);
+			}
+			else
+			{
+				LOGGER.warn("Action class " + actionClassTag
+						+ " does not implement the ActionLifecycle interface");
+				if (LOGGER.isDebugEnabled())
+				{
+					LOGGER.debug("Using overridden actions processor for "
+							+ actionClassTag);
+				}
+				processor = new OverriddenActionProcessor(actionConfig,
+						actionClass);
+			}
+			processorList.add(processor);
+		}
+		processors = processorList
+				.toArray(new ActionPipelineProcessor[processorList.size()]);
+	}
+
+	/**
+	 * Handle the initialisation of the pipeline
+	 * 
+	 * @throws ConfigurationException
+	 *             For errors during initialisation.
+	 */
+	public void initialise() throws ConfigurationException
+	{
+		final int numLifecycles = processors.length;
+		for (int count = 0; count < numLifecycles; count++)
+		{
+			final ActionLifecycle lifecycle = processors[count];
+			try
+			{
+				lifecycle.initialise();
+			}
+			catch (final Exception ex)
+			{
+				handleDestroy(count - 1);
+				throw new ConfigurationException(
+						"Unexpected exception during lifecycle initialisation",
+						ex);
+			}
+		}
+		active.set(true);
+	}
+
+	/**
+	 * Handle the destruction of the pipeline
+	 */
+	public void destroy()
+	{
+		active.set(false);
+		handleDestroy(processors.length - 1);
+	}
+
+	/**
+	 * Process the specified message.
+	 * 
+	 * @param message
+	 *            The current message.
+	 * @return true if the processing was successful, false otherwise.
+	 */
+	public boolean process(final Message message)
+	{
+		final EPR faultToAddress = getFaultToAddress(message);
+
+		if (active.get())
+		{
+			if (LOGGER.isDebugEnabled())
+			{
+				LOGGER.debug("pipeline process for message");
+			}
+
+			final int numProcessors = processors.length;
+			final Message[] messages = new Message[numProcessors];
+			final EPR replyToAddress = getReplyToAddress(message);
+
+			Message currentMessage = message;
+
+			for (int count = 0; count < numProcessors; count++)
+			{
+				final ActionPipelineProcessor processor = processors[count];
+				messages[count] = currentMessage;
+
+				try
+				{
+					LOGGER.debug("executing processor " + count);
+					currentMessage = processor.process(currentMessage);
+
+					if (currentMessage == null)
+					{
+						break;
+					}
+				}
+				catch (final Exception ex)
+				{
+					LOGGER
+							.warn(
+									"Unexpected exception caught while processing the action pipeline",
+									ex);
+
+					notifyException(count, ex, messages);
+
+					/*
+					 * Is this an application specific error? If so, try to return
+					 * the error message to the identified recipient.
+					 */
+					
+					if (ex instanceof ActionProcessingFaultException)
+					{
+						ActionProcessingFaultException fault = (ActionProcessingFaultException) ex;
+
+						if (fault.getFaultMessage() == null)
+						{
+							faultTo(faultToAddress, Factory.createErrorMessage(Factory.PROCESSING_ERROR, message, ex));
+						}
+						else
+							faultTo(faultToAddress, fault.getFaultMessage());
+					}
+					else
+					{
+						faultTo(faultToAddress, Factory.createErrorMessage(Factory.UNEXPECTED_ERROR, message, ex));
+					}
+
+					return false;
+				}
+			}
+
+			// Reply...
+			if ((message != null) && (replyToAddress != null))
+			{
+				replyTo(replyToAddress, currentMessage);
+			}
+
+			notifySuccess(messages);
+			
+			return true;
+		}
+		else
+		{
+			LOGGER.debug("pipeline process disabled for message");
+
+			faultTo(faultToAddress, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
+
+			return false;
+		}
+	}
+
+	/**
+	 * Get the ReplyTo EPR on this address. If it is not set, then try to return
+	 * the From EPR. If that is not set, then return null.
+	 * 
+	 * @param message
+	 *            the message to work on.
+	 * @return the ReplyTo EPR, or the From EPR, or null, in that order.
+	 */
+
+	private EPR getReplyToAddress(Message message)
+	{
+		if (message == null)
+			return null;
+
+		try
+		{
+			if (message.getHeader().getCall().getReplyTo() != null)
+				return message.getHeader().getCall().getReplyTo();
+			else
+				return message.getHeader().getCall().getFrom();
+		}
+		catch (final NullPointerException e)
+		{
+			// OK, it's not set... return null...
+		}
+
+		return null;
+	}
+
+	/**
+	 * Send the reply.
+	 * 
+	 * @param replyToAddress
+	 *            the EPR to target if one is not defined in the message.
+	 * @param message
+	 *            the message.
+	 */
+
+	private void replyTo(EPR replyToAddress, Message message)
+	{
+		Courier courier = null;
+		EPR replyToEPR = getReplyToAddress(message);
+
+		if (replyToEPR != null)
+			replyToAddress = replyToEPR;
+
+		try
+		{
+			courier = CourierFactory.getCourier(replyToAddress);
+			courier.deliver(message);
+		}
+		catch (final CourierException e)
+		{
+			LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+					e);
+		}
+		catch (final MalformedEPRException e)
+		{
+			LOGGER.error("Failed to reply to address " + replyToAddress + ".",
+					e);
+		}
+		finally
+		{
+			if (courier != null)
+			{
+				CourierUtil.cleanCourier(courier);
+			}
+		}
+	}
+
+	/**
+	 * Get the FaultTo EPR on this address. If it is not set, then try to return
+	 * the From EPR. If that is not set, then return null.
+	 * 
+	 * @param message
+	 *            the message to work on.
+	 * @return the FaultTo EPR, or the From EPR, or null, in that order.
+	 */
+
+	private EPR getFaultToAddress(Message message)
+	{
+		if (message == null)
+			return null;
+
+		try
+		{
+			if (message.getHeader().getCall().getFaultTo() != null)
+				return message.getHeader().getCall().getFaultTo();
+			else
+				return message.getHeader().getCall().getFrom();
+		}
+		catch (final NullPointerException e)
+		{
+			// OK, it's not set... return null...
+		}
+
+		return null;
+	}
+
+	/**
+	 * Send the fault message to the EPR.
+	 * 
+	 * @param faultToAddress
+	 *            the EPR to target if one is not set in the message.
+	 * @param message
+	 *            the message.
+	 */
+
+	private void faultTo(EPR faultToAddress, Message message)
+	{
+		Courier courier = null;
+		EPR faultToEPR = getFaultToAddress(message);
+
+		if (faultToEPR != null)
+			faultToAddress = faultToEPR;
+
+		try
+		{
+			courier = CourierFactory.getCourier(faultToAddress);
+			courier.deliver(message);
+		}
+		catch (final CourierException e)
+		{
+			LOGGER.error("Failed to send error to address " + faultToAddress
+					+ ".", e);
+		}
+		catch (final MalformedEPRException e)
+		{
+			LOGGER.error("Failed to send error to address " + faultToAddress
+					+ ".", e);
+		}
+		finally
+		{
+			if (courier != null)
+			{
+				CourierUtil.cleanCourier(courier);
+			}
+		}
+	}
+
+	/**
+	 * Handle the destruction of the pipeline from the specified position.
+	 * 
+	 * @param initialPosition
+	 *            The initial position to begin destruction.
+	 */
+	private void handleDestroy(final int initialPosition)
+	{
+		for (int count = initialPosition; count >= 0; count--)
+		{
+			final ActionLifecycle lifecycle = processors[count];
+			try
+			{
+				lifecycle.destroy();
+			}
+			catch (final Exception ex)
+			{
+				LOGGER
+						.warn(
+								"Unexpected exception during lifecycle destruction",
+								ex);
+			}
+		}
+	}
+
+	/**
+	 * Notify the processors of an error during processing.
+	 * 
+	 * @param initialPosition
+	 *            The position of the first processor to notify.
+	 * @param ex
+	 *            The exception which caused the failure.
+	 * @param messages
+	 *            The messages associated with successful processors.
+	 */
+	private void notifyException(final int initialPosition, final Exception ex,
+			final Message[] messages)
+	{
+		for (int count = initialPosition; count >= 0; count--)
+		{
+			final ActionPipelineProcessor processor = processors[count];
+			try
+			{
+				processor.processException(messages[count], ex);
+			}
+			catch (final Exception ex2)
+			{
+				LOGGER
+						.warn(
+								"Unexpected exception notifying processor of pipeline failure",
+								ex2);
+			}
+		}
+	}
+
+	/**
+	 * Notify the processors of a successful pipeline process.
+	 * 
+	 * @param messages
+	 *            The messages associated with the processors.
+	 */
+	private void notifySuccess(final Message[] messages)
+	{
+		for (int count = messages.length - 1; count >= 0; count--)
+		{
+			final Message message = messages[count];
+			if (message != null)
+			{
+				final ActionPipelineProcessor processor = processors[count];
+				try
+				{
+					processor.processSuccess(messages[count]);
+				}
+				catch (final Exception ex)
+				{
+					LOGGER
+							.warn(
+									"Unexpected exception notifying processor of pipeline success",
+									ex);
+				}
+			}
+		}
+	}
 }

Added: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/errors/Factory.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/errors/Factory.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/errors/Factory.java	2007-05-01 11:26:40 UTC (rev 11547)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+
+package org.jboss.soa.esb.listeners.message.errors;
+
+import java.net.URI;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.listeners.message.ActionProcessingPipeline;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+public class Factory
+{
+	public static final String ERROR_ATTRIBUTE = "org.jboss.soa.esb.listeners.message.errors";
+	
+	public static final String PROCESSING_ERROR = "urn:action/error/actionprocessingerror";
+	public static final String UNEXPECTED_ERROR = "urn:action/error/unexpectederror";
+	public static final String NOT_ENABLED = "urn:action/error/disabled";
+	
+	public static Message createErrorMessage (String type, Message input, Throwable problem)
+	{
+		if (input == null)
+			throw new IllegalArgumentException();
+		
+		Message errorMessage = MessageFactory.getInstance().getMessage(input.getType());
+		
+		if (errorMessage == null)
+			throw new IllegalArgumentException("Could not create error message from "+input.getType());
+		
+		if (modifyMessage(input, errorMessage))
+		{			
+			try
+			{
+				errorMessage.getFault().setCode(new URI(type));
+				
+				/*
+				 * Is there an exception? If so, add the string as the reason.
+				 */
+				
+				if (problem != null)
+					errorMessage.getFault().setReason(problem.toString());
+			}
+			catch (final Exception ex)
+			{
+				_logger.debug("Caught exception "+ex+" during message creation!");
+				
+				return null;
+			}
+				
+			return errorMessage;
+		}
+		else
+			return null;
+	}
+	
+	/**
+	 * Where should the error message go? Check the header of the original
+	 * input message.
+	 */
+	
+	private final static boolean modifyMessage (Message input, Message errorMessage)
+	{
+		EPR destination = input.getHeader().getCall().getFaultTo();
+		
+		if ((destination == null) && (input.getHeader().getCall().getReplyTo() != null))
+			destination = input.getHeader().getCall().getReplyTo();
+		
+		if ((destination == null) && (input.getHeader().getCall().getFrom() != null))
+			destination = input.getHeader().getCall().getFrom();
+		
+		if (destination != null)
+		{
+			errorMessage.getHeader().getCall().setTo(destination);
+			
+			if (input.getHeader().getCall().getMessageID() != null)
+				errorMessage.getHeader().getCall().setRelatesTo(input.getHeader().getCall().getMessageID());
+			
+			return true;
+		}
+		else
+			return false;
+	}
+	
+	private final static Logger _logger = Logger.getLogger(Factory.class);
+}
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/docs/ProgrammersGuide.odt
===================================================================
(Binary files differ)




More information about the jboss-svn-commits mailing list