[jboss-svn-commits] JBL Code SVN: r10050 - labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Mar 7 17:34:43 EST 2007
Author: rex.sheridan
Date: 2007-03-07 17:34:43 -0500 (Wed, 07 Mar 2007)
New Revision: 10050
Added:
labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
Commit of bean configuration capabilities
Added: labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java (rev 0)
+++ labs/jbossesb/workspace/rsheridan/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2007-03-07 22:34:43 UTC (rev 10050)
@@ -0,0 +1,250 @@
+package org.jboss.soa.esb.listeners.message;
+
+import java.beans.BeanInfo;
+import java.beans.IntrospectionException;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Observable;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.util.BeanConfigurator;
+
+
+/**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs a list of action classes on a message
+ *
+ * @author <a href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ */
+public class ActionProcessingPipeline extends Observable implements Runnable
+{
+ private boolean processingComplete = false;
+
+ /**
+ * public constructor
+ * @param message Message - The initial message to be run through the whole action
+ * class chain
+ */
+ public ActionProcessingPipeline(Message message, ConfigTree config)
+ throws ConfigurationException
+ {
+ if (null==message)
+ throw new IllegalArgumentException("Message must be not null");
+ _message = message;
+ _config = config;
+ if (null==_config)
+ throw new IllegalArgumentException("Configuration needed for action classes");
+ _actionList = _config.getChildren(ListenerTagNames.ACTION_ELEMENT_TAG);
+ if (null==_actionList || _actionList.length<1)
+ throw new ConfigurationException("No actions in list");
+ }
+
+ /**
+ *
+ * @return Message - current Message of this action chain processor
+ */
+ public Message getMessage() { return _message; }
+
+ /**
+ * Implement Runnable Interface
+ * <p/>Uses reflection to instantiate action classes that must have a public constructor
+ * that takes a single ConfigTree as argument
+ * <p/>Requires each action class to have a public method that takes a Message and returns a Message
+ * <br/>Default name for it is 'process' but can optionally be defined in the 'process' attribute
+ * of the corresponding <action> element of the ConfigTree
+ * <p/>Each <action> element can optionally define a method (taking a Message argument) to be
+ * called upon successful completion of the action class (that step of the chain)
+ * <br/>Default name for it is 'process' but can optionally be defined in the 'process' attribute
+ * <p/>See actionClassException and actionClassFinishedOk
+ * */
+ public void run()
+ {
+ try
+ {
+ // Run the message through each ActionProcessor...
+ _currentIndex = -1;
+ for(ConfigTree oCurr : _actionList)
+ {
+ _currentIndex++;
+ String attrName = ListenerTagNames.ACTION_CLASS_TAG;
+ _currentAction = oCurr.getAttribute(attrName);
+ _currentClass = Class.forName(_currentAction);
+ try {
+ Constructor oConst = _currentClass.getConstructor(new Class[] {ConfigTree.class});
+ _currentProcessor = oConst.newInstance(oCurr);
+ } catch(NoSuchMethodException e) {
+ Constructor constructor = _currentClass.getConstructor(new Class[] {});
+ _currentProcessor = constructor.newInstance(new Object[] {});
+ BeanConfigurator configurator = new ActionBeanConfigurator(oCurr, _currentProcessor);
+ try {
+ configurator.configure();
+ } catch (IntrospectionException e1) {
+ _logger.error(prematureTermination("Unable to configure bean"), e1);
+ return;
+ }
+ }
+ attrName = ListenerTagNames.PROCESS_METHOD_TAG;
+ String[] saMethodList = obtainAttribute(oCurr,attrName,attrName).split(",");
+
+ for (String currMethod : saMethodList)
+ {
+// _logger.debug("Attempting to invoke "+_currentClass.getName()+" method "+currMethod);
+ Method method = _currentClass.getMethod(currMethod,new Class[] {Message.class});
+
+ // The processing result of each action feeds into the processing of the next action...
+ try
+ {
+ Message next = (Message)method.invoke(_currentProcessor,new Object[] {_message} );
+ if(next==null)
+ {
+ _logger.debug(prematureTermination("returned <null> - Cannot continue"));
+ return;
+ }
+ _message = next;
+ }
+ catch (ClassCastException eCast)
+ {
+ // If action class returns non Message, log and abort chain
+ _logger.error(prematureTermination("returned a non Message Object)"));
+ return;
+ }
+ catch (InvocationTargetException e)
+ {
+ // If action class threw exception, log and abort chain
+ _logger.error("Process method threw Exception",e);
+ actionClassException(oCurr,_message,e.getCause());
+ return;
+ }
+ }
+ actionClassFinishedOk(oCurr,_message);
+ }
+ // notification of action chain end can be done with a no-operation action class
+ // with a proper ListenerPropertyNames.ACTION_NORMAL_COMPLETION_CALLBACK method
+ // Same idea when some interaction needed with the 'esb unaware' world
+ }
+ catch(ClassNotFoundException e)
+ { _logger.error(prematureTermination("action class is not in path"),e); }
+ catch(NoSuchMethodException e)
+ { _logger.error(prematureTermination("method not found"),e); }
+ catch(InstantiationException e)
+ { _logger.error(prematureTermination("cannot instantiate action class"),e); }
+ catch(InvocationTargetException e)
+ { _logger.error(prematureTermination("method not found"),e); }
+ catch(IllegalAccessException e)
+ { _logger.error(prematureTermination("unable to access method"),e); }
+ catch (IllegalArgumentException e) {
+ _logger.error(prematureTermination(e.getMessage()),e);
+ }
+ finally
+ {
+ processingComplete = true;
+ setChanged();
+ notifyObservers(new Integer(-1));
+ }
+ }
+
+ protected String prematureTermination(String s)
+ {
+ return new StringBuilder("Premature termination of action processing pipeline ")
+ .append(getActionNames())
+ .append("]. ActionProcessor [").append(_currentAction)
+ .append("] ").append(s)
+ .toString()
+ ;
+ }
+
+ /**
+ * If 'current' action step was configured with a 'exceptionMethod' attribute
+ * that method will be called with a single argument of type Exception
+ * @param tree ConfigTree - where to look for the exceptionMetod attribute
+ * @param thrown Exception - to be used in invocation to method (if found)
+ */
+ protected void actionClassException(ConfigTree tree, Message msg, Throwable thrown)
+ {
+// thrown.printStackTrace();
+ String sMethod = obtainAttribute(tree,ListenerTagNames.EXCEPTION_METHOD_TAG,null);
+ if (null!=sMethod)
+ try
+ {
+ Method method = _currentClass.getMethod(sMethod,new Class[] {Message.class, Throwable.class});
+ method.invoke(_currentProcessor,new Object[] {msg, thrown} );
+ }
+ catch (NoSuchMethodException e) {_logger.error(e); }
+ catch (InvocationTargetException e) {_logger.error(e); }
+ catch (IllegalAccessException e) {_logger.error(e); }
+ }
+
+ /**
+ * If 'current' action step was configured with an 'okMethod' attribute
+ * that method will be called with no arguments
+ * @param tree ConfigTree - where to look for the okMetod attribute
+ */
+ protected void actionClassFinishedOk(ConfigTree tree,Message msg)
+ {
+ String sMethod = obtainAttribute(tree,ListenerTagNames.NORMAL_COMPLETION_METHOD_TAG,null);
+ if (null!=sMethod)
+ try
+ {
+ Method method = _currentClass.getMethod(sMethod,new Class[] {Message.class});
+ method.invoke(_currentProcessor,new Object[] {msg} );
+ }
+ catch (NoSuchMethodException e) {_logger.error(e); }
+ catch (InvocationTargetException e) {_logger.error(e); }
+ catch (IllegalAccessException e) {_logger.error(e); }
+ }
+
+ protected String[] getActionNames()
+ {
+ String[] sa = new String[_actionList.length];
+ int i1=0;
+ for (ConfigTree oCurr : _actionList)
+ sa[i1++]=obtainAttribute(oCurr,ListenerTagNames.ACTION_CLASS_TAG,"NO_CLASSNAME");
+ return sa;
+ }
+
+ private static String obtainAttribute(ConfigTree tree,String p_sAtt, String p_sDefault)
+ {
+ String sVal = tree.getAttribute(p_sAtt);
+ return (null != sVal) ? sVal : p_sDefault;
+ } // ________________________________
+
+ protected ConfigTree[] _actionList;
+ protected int _currentIndex;
+ protected String _currentAction;
+ protected Class _currentClass;
+ protected Object _currentProcessor;
+ protected Message _message;
+ protected ConfigTree _config;
+ protected Logger _logger = Logger.getLogger(this.getClass());
+
+ /**
+ * @return Returns the processingComplete.
+ */
+ public boolean isProcessingComplete() {
+ return processingComplete;
+ }
+
+ /**
+ * Wait until processing is complete.
+ */
+ public void waitUntilComplete() {
+ while(!processingComplete) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ _logger.error("Thread Interrupted.", e);
+ }
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list