[jboss-svn-commits] JBL Code SVN: r10050 - labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Mar 7 17:34:43 EST 2007


Author: rex.sheridan
Date: 2007-03-07 17:34:43 -0500 (Wed, 07 Mar 2007)
New Revision: 10050

Added:
   labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
Commit of bean configuration capabilities

Added: labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	                        (rev 0)
+++ labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-03-07 22:34:43 UTC (rev 10050)
@@ -0,0 +1,250 @@
+package org.jboss.soa.esb.listeners.message;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Observable;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.BeanConfigurator;
+
+
+/**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs a list of action classes on a message
+ * 
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+public class ActionProcessingPipeline extends Observable implements Runnable 
+{    
+	private boolean processingComplete = false;
+         
+    /**
+     * public constructor
+     * @param message Message - The initial message to be run through the whole action 
+     * class chain
+     */
+    public ActionProcessingPipeline(Message message, ConfigTree config)
+    	throws ConfigurationException
+    {
+    	if (null==message)
+    		throw new IllegalArgumentException("Message must be not null");
+        _message	= message;
+        _config		= config;
+    	if (null==_config)
+    		throw new IllegalArgumentException("Configuration needed for action classes");
+    	_actionList = _config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
+    	if (null==_actionList || _actionList.length<1)
+    		throw new ConfigurationException("No actions in list");
+    }
+    
+    /**
+     * 
+     * @return Message - current Message of this action chain processor
+     */
+    public Message getMessage()     { return _message; }
+
+    /**
+     * Implement Runnable Interface
+     * <p/>Uses reflection to instantiate action classes that must have a public constructor
+     * that takes a single ConfigTree as argument
+     * <p/>Requires each action class to have a public method that takes a Message and returns a Message
+     * <br/>Default name for it is 'process'  but can optionally be defined in the 'process' attribute
+     * of the corresponding &lt;action&gt; element of the ConfigTree
+     * <p/>Each &lt;action&gt; element can optionally define a method (taking a Message argument) to be 
+     * called upon successful completion of the action class (that step of the chain)
+     * <br/>Default name for it is 'process'  but can optionally be defined in the 'process' attribute
+     * <p/>See actionClassException and actionClassFinishedOk
+     * */
+    public void run()
+    {
+        try
+        {
+            // Run the message through each ActionProcessor...
+        	_currentIndex = -1;
+            for(ConfigTree oCurr : _actionList) 
+            {
+            	_currentIndex++;
+            	String attrName = ListenerTagNames.ACTION_CLASS_TAG;
+            	_currentAction = oCurr.getAttribute(attrName);
+            	_currentClass = Class.forName(_currentAction);
+            	try {
+            	Constructor oConst = _currentClass.getConstructor(new Class[] {ConfigTree.class});
+            	_currentProcessor = oConst.newInstance(oCurr);
+            	} catch(NoSuchMethodException e) {
+            		Constructor constructor = _currentClass.getConstructor(new Class[] {});
+            		_currentProcessor = constructor.newInstance(new Object[] {});
+            		BeanConfigurator configurator = new ActionBeanConfigurator(oCurr, _currentProcessor);
+            		try {
+						configurator.configure();
+					} catch (IntrospectionException e1) {
+						_logger.error(prematureTermination("Unable to configure bean"), e1);
+						return;
+					}
+            	}
+            	attrName	= ListenerTagNames.PROCESS_METHOD_TAG;
+            	String[] saMethodList = obtainAttribute(oCurr,attrName,attrName).split(",");
+
+            	for (String currMethod : saMethodList)
+            	{
+//                	_logger.debug("Attempting to invoke "+_currentClass.getName()+" method "+currMethod);
+	            	Method method = _currentClass.getMethod(currMethod,new Class[] {Message.class});
+	            	
+	            	// The processing result of each action feeds into the processing of the next action...
+	                try 
+	                {
+	                	Message next = (Message)method.invoke(_currentProcessor,new Object[] {_message} );
+	                    if(next==null)
+	                    {
+	                    	_logger.debug(prematureTermination("returned <null> - Cannot continue"));
+	                    	return;
+	                    }
+	                    _message = next;
+	                }
+	                catch (ClassCastException eCast)
+	                {
+	                	// If action class returns non Message, log and abort chain
+	                	_logger.error(prematureTermination("returned a non Message Object)"));
+	                	return;
+	                }
+	                catch (InvocationTargetException e)
+	                {
+	                	// If action class threw exception, log and abort chain
+	                	_logger.error("Process method threw Exception",e);
+	                    actionClassException(oCurr,_message,e.getCause());
+	                    return;
+	                }
+            	}            	
+                actionClassFinishedOk(oCurr,_message);
+            }
+            // notification of action chain end can be done with a no-operation action class
+            // with a proper ListenerPropertyNames.ACTION_NORMAL_COMPLETION_CALLBACK method
+            // Same idea when some interaction needed with the 'esb unaware' world
+        } 
+        catch(ClassNotFoundException e)
+    	{	_logger.error(prematureTermination("action class is not in path"),e); } 
+        catch(NoSuchMethodException e)
+    	{	_logger.error(prematureTermination("method not found"),e); }
+        catch(InstantiationException e)
+    	{	_logger.error(prematureTermination("cannot instantiate action class"),e); } 
+        catch(InvocationTargetException e)
+    	{	_logger.error(prematureTermination("method not found"),e); } 
+        catch(IllegalAccessException e)
+    	{	_logger.error(prematureTermination("unable to access method"),e); } 
+        catch (IllegalArgumentException e) {
+        	_logger.error(prematureTermination(e.getMessage()),e);
+        } 
+        finally 
+        {
+        	processingComplete = true;
+        	setChanged();
+        	notifyObservers(new Integer(-1));
+        }
+    }
+    
+    protected String prematureTermination(String s)
+    {
+    	return new StringBuilder("Premature termination of action processing pipeline ")
+		.append(getActionNames())
+		.append("].  ActionProcessor [").append(_currentAction)
+		.append("] ").append(s)
+		.toString()
+		;
+    }
+    
+    /**
+     * If 'current' action step was configured with a 'exceptionMethod' attribute
+     * that method will be called with a single argument of type Exception 
+     * @param tree ConfigTree - where to look for the exceptionMetod attribute
+     * @param thrown Exception - to be used in invocation to method (if found)
+     */
+    protected void actionClassException(ConfigTree tree, Message msg,  Throwable thrown)
+    {
+//    	thrown.printStackTrace();
+    	String sMethod = obtainAttribute(tree,ListenerTagNames.EXCEPTION_METHOD_TAG,null);
+    	if (null!=sMethod)
+	    	try
+	    	{
+	    		Method method = _currentClass.getMethod(sMethod,new Class[] {Message.class, Throwable.class});
+	    		method.invoke(_currentProcessor,new Object[] {msg, thrown} );
+	    	}
+	    	catch (NoSuchMethodException e) 	{_logger.error(e); }
+	    	catch (InvocationTargetException e) {_logger.error(e); }
+	    	catch (IllegalAccessException e) 	{_logger.error(e); }
+    }
+
+    /**
+     * If 'current' action step was configured with an 'okMethod' attribute
+     * that method will be called with no arguments 
+     * @param tree ConfigTree - where to look for the okMetod attribute
+     */
+    protected void actionClassFinishedOk(ConfigTree tree,Message msg)
+    {
+    	String sMethod = obtainAttribute(tree,ListenerTagNames.NORMAL_COMPLETION_METHOD_TAG,null);
+    	if (null!=sMethod)
+	    	try
+	    	{
+	    		Method method = _currentClass.getMethod(sMethod,new Class[] {Message.class});
+	    		method.invoke(_currentProcessor,new Object[] {msg} );
+	    	}
+	    	catch (NoSuchMethodException e) 	{_logger.error(e); }
+	    	catch (InvocationTargetException e) {_logger.error(e); }
+	    	catch (IllegalAccessException e) 	{_logger.error(e); }
+    }
+
+    protected String[] getActionNames()
+    {
+    	String[] sa = new String[_actionList.length];
+    	int i1=0;
+    	for (ConfigTree oCurr : _actionList)
+    		sa[i1++]=obtainAttribute(oCurr,ListenerTagNames.ACTION_CLASS_TAG,"NO_CLASSNAME");
+    	return sa;
+    }
+    
+    private static String obtainAttribute(ConfigTree tree,String p_sAtt, String p_sDefault)
+	{
+		String sVal = tree.getAttribute(p_sAtt);
+		return (null != sVal) ? sVal : p_sDefault;
+	} // ________________________________
+	
+    protected ConfigTree[]	_actionList;
+    protected int			_currentIndex;
+    protected String		_currentAction;
+    protected Class 		_currentClass;
+    protected Object		_currentProcessor;
+	protected Message		_message;
+	protected ConfigTree	_config;
+	protected Logger		_logger = Logger.getLogger(this.getClass());
+
+	/**
+	 * @return Returns the processingComplete.
+	 */
+	public boolean isProcessingComplete() {
+		return processingComplete;
+	}
+	
+	/**
+	 * Wait until processing is complete.
+	 */
+	public void waitUntilComplete() {
+		while(!processingComplete) {
+			try {
+				Thread.sleep(50);
+			} catch (InterruptedException e) {
+				_logger.error("Thread Interrupted.", e);
+			}
+		}
+	}
+}




More information about the jboss-svn-commits mailing list