[jboss-svn-commits] JBL Code SVN: r6150 - in labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb: actions command listeners

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Sep 11 14:33:08 EDT 2006


Author: tfennelly
Date: 2006-09-11 14:32:57 -0400 (Mon, 11 Sep 2006)
New Revision: 6150

Added:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java
Removed:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
Modified:
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
   labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
Chained actions stuff

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,276 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+/**
+ * Action Definition Factory.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionDefinitionFactory {
+
+    /**
+     * Class Logger.
+     */
+    private static Logger logger = Logger.getLogger(ActionDefinitionFactory.class);
+    /**
+     * Action Definition table.
+     */
+    private Hashtable<String, ActionDefinition> actionDefinitions = new Hashtable<String, ActionDefinition>();
+
+    /**
+     * Construct a factory instance.
+     * @param config ActionDefinition configuration.
+     * @throws ConfigurationException Invalid configuration.
+     */
+    public ActionDefinitionFactory(DomElement config) throws ConfigurationException {
+        if(config == null || !config.getName().equals("Actions")) {
+            throw new ConfigurationException("No 'Actions' configuration.");
+        }
+        
+        // Read the handler alias and action def configurations from the XML configuration...
+        DomElement handlerAliasConfig = config.getFirstElementChild("HandlerAliases");
+        DomElement[] actions = config.getElementChildren("Action");
+        if(handlerAliasConfig == null) {
+            throw new ConfigurationException("No 'Actions/HandlerAliases' configuration.");
+        }
+        if(actions == null || actions.length == 0) {
+            throw new ConfigurationException("No 'Actions/Action' configurations.");
+        }
+        
+        // Initialise the Action Definition table..
+        Hashtable<String, String> handlerClasses = getHandlerClasses(handlerAliasConfig);
+        initialiseActionDefinitions(actions, handlerClasses);
+        logger.info("ActionDefinition Factory initialisation complete.");
+    }
+    
+    /**
+     * Get the action definition for the specified action name.
+     * @param actionName The action name.
+     * @return The action definition.
+     */
+    public ActionDefinition getInstance(String actionName) {
+        if(actionName == null || actionName.trim().equals("")) {
+            throw new IllegalArgumentException("Null or empty 'actionName' arg in method call.");
+        }
+
+        return actionDefinitions.get(actionName);
+    }
+
+    /**
+     * Initialise the Action Definitions.
+     * @param actions Action Definition configurations.
+     * @param handlerClasses Handler classes keyed by alias name.
+     * @throws ConfigurationException 
+     */
+    private void initialiseActionDefinitions(DomElement[] actions, Hashtable<String, String> handlerClasses) throws ConfigurationException {
+        if(handlerClasses == null || handlerClasses.isEmpty()) {
+            throw new ConfigurationException("No action handler classes defined.");
+        }
+        
+        for(DomElement action : actions) {
+            String name = action.getAttr("name");
+            String handlerAlias = action.getAttr("handler");
+            String handlerClass;
+            List<KeyValuePair> properties;
+
+            // Check the required attributes...
+            if(name == null || name.trim().equals("")) {
+                throw new ConfigurationException("Actions/Action has no 'name' defined.");
+            }
+            if(handlerAlias == null || handlerAlias.trim().equals("")) {
+                throw new ConfigurationException("Actions/Action [" + name + "] has no 'handler' defined.");
+            }
+            handlerClass = handlerClasses.get(handlerAlias);
+            if(handlerClass == null) {
+                throw new ConfigurationException("No action handler class defined for handler alias [" + handlerAlias + "] set on action [" + name + "].");
+            }
+
+            // Get any properties defined on the action...
+            properties = getProperties(action);
+            
+            // Create the action definition and store it against it's name...
+            actionDefinitions.put(name, new ActionDefinition(name, handlerClass, properties));
+            logger.info("Added ActionDefinition [" + name + "] for handler [" + handlerAlias + ":" + handlerClass + "]. Num properties: " + properties.size());
+        }
+    }
+
+    /**
+     * Get the handler runtime classes.
+     * @param handlerAliasConfig Alias configs.
+     * @return Handler runtime classs 
+     * @throws ConfigurationException Bad configuration.
+     */
+    private Hashtable<String, String> getHandlerClasses(DomElement handlerAliasConfig) throws ConfigurationException {
+        DomElement[] aliases = handlerAliasConfig.getElementChildren("Alias");
+        Hashtable<String, String> handlerClasses = new Hashtable<String, String>();
+        
+        if(aliases == null) {
+            throw new ConfigurationException("No 'Actions/HandlerAliases/Alias' configurations.");
+        }
+        
+        for(DomElement alias : aliases) {
+            String name = alias.getAttr("name");
+            String className = alias.getAttr("class");
+    
+            // Check the required attributes...
+            if(name == null || name.trim().equals("")) {
+                throw new ConfigurationException("Actions/HandlerAliases/Alias has no 'name' defined.");
+            }
+            if(className == null || className.trim().equals("")) {
+                throw new ConfigurationException("Actions/HandlerAliases/Alias [" + name + "] has no 'class' defined.");
+            }
+            handlerClasses.put(name, className);
+            logger.info("Added alias [" + name + "] for ActionHandler class [" + className + "].");
+        }
+        
+        return handlerClasses;
+    }
+
+    /**
+     * Get the properties defined on the supplied 'Action' configuration.
+     * @param action The action configuration element.
+     * @return A list of property {@link KeyValuePair} instances.  An empty list list is returned
+     * if no properties are defined on the action.
+     * @throws ConfigurationException A property name or value is empty or undefined.
+     */
+    private List<KeyValuePair> getProperties(DomElement action) throws ConfigurationException {
+        DomElement[] properties = action.getElementChildren("property");
+        List<KeyValuePair> propertyList = new ArrayList<KeyValuePair>();
+        
+        for(DomElement property : properties) {
+            String name = property.getAttr("name");
+            String value = property.getAttr("value");
+
+            // Check the required attributes...
+            if(name == null || name.trim().equals("")) {
+                throw new ConfigurationException("Actions/Action/property has no 'name' defined. Action [" + action.getAttr("name") + "].");
+            }
+            if(value == null || value.trim().equals("")) {
+                throw new ConfigurationException("Actions/Action/property has no 'value' defined. Action [" + action.getAttr("name") + "], Property [" + name + "].");
+            }
+            
+            propertyList.add(new KeyValuePair(name, value));
+        }
+
+        return propertyList;
+    }
+
+    /**
+     * Action Definition.
+     * <p/>
+     * An Action is defined by name, a handler instance to carry out the action, plus properties to the action
+     * to be supplued to the handler instance.
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+    public class ActionDefinition {
+        
+        private String name;
+        private String handler;
+        private List<KeyValuePair> properties;
+
+        /**
+         * Private constructor.
+         * @param name The action name.
+         * @param handler The action handler runtime class.
+         * @param properties Action properties list.  An empty list for an action with no defined properties.
+         */
+        private ActionDefinition(String name, String handler, List<KeyValuePair> properties) {
+            this.name = name;
+            this.handler = handler;
+            this.properties = properties;
+        }
+        
+        /**
+         * Get the action name.
+         * @return The action name.
+         */
+        public String getName() {
+            return name;
+        }
+        
+        /**
+         * Get the action handler instance.
+         * @return The Action Handler.
+         */
+        public ActionHandler getHandler() {
+            // TODO: Support singleton ActionHandler instances.  This impl currently only supports prototype.
+            return createActionHandler();
+        }
+        
+        /**
+         * Get the action properties to be supplied to the action handler.
+         * @return The configured action properties. An empty list for an action with no defined properties.
+         */
+        public List<KeyValuePair> getProperties() {
+            return properties;
+        }
+
+        /**
+         * Create the handler instance.
+         * Handler instance.
+         * @return
+         */
+        private ActionHandler createActionHandler() {
+            Class<? extends ActionHandler> runtimeClass;
+            Class[] NONDEFAULT_SIG = new Class[] {String.class, List.class};
+            Constructor nonDefaultConstructor;
+            
+            // Get the runtime class...
+            try {
+                runtimeClass = Class.forName(handler).asSubclass(ActionHandler.class);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalStateException("Action Handler class " + handler + " not found in classpath.", e);
+            } catch (ClassCastException e) {
+                throw new IllegalStateException("Action Handler class " + handler + " does not implement " + ActionHandler.class.getName(), e);
+            }            
+            
+            // Construct it...
+            try {
+                nonDefaultConstructor = runtimeClass.getConstructor(NONDEFAULT_SIG);
+                return (ActionHandler) nonDefaultConstructor.newInstance(new Object[] {name, properties});
+            } catch (NoSuchMethodException e1) {
+                try {
+                    return runtimeClass.newInstance();
+                } catch (InstantiationException e) {
+                    throw new IllegalStateException("No appropriate constructor found on handler class [" + handler + "].  See " + ActionHandler.class.getName() + " Javadoc.", e);
+                } catch (Exception e) {
+                    throw new IllegalStateException("Unexpected exception.  Unable to construct handler class instance [" + handler + "] using default constructor.", e);
+                }
+            } catch (Exception e) {
+                throw new IllegalStateException("Unexpected exception.  Unable to construct handler class instance [" + handler + "] using non-default constructor.", e);
+            }
+        }
+    }
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+/**
+ * Action Handler Interface Definition.
+ * <p/>
+ * Implementations are constructed based on the following public constructor order precedence:
+ * <ol>
+ *  <li><b>(String actionName, List&lt;{@link org.jboss.soa.esb.helpers.KeyValuePair}&gt; properties)</b></li>
+ *  <li><b>default constructor</b>.</li>
+ * </ol>
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface ActionHandler {
+    
+    /**
+     * Process the Action "payload" and return the result.
+     * @param payload The action payload to be processed.
+     * @return The processing result.
+     * @throws ActionProcessingException Exception during payload processing.
+     */
+    public Object processAction(Object payload) throws ActionProcessingException;
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Exception while processing message payload processing action.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionProcessingException extends BaseException {
+
+    /**
+     * Public constructor.
+     * @param message Exception message.
+     */
+    public ActionProcessingException(String message) {
+        super(message);
+    }
+
+    /**
+     * Public constructor.
+     * @param message Exception message.
+     * @param cause Exception cause.
+     */
+    public ActionProcessingException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * Public constructor.
+     * @param cause Exception cause.
+     */
+    public ActionProcessingException(Throwable cause) {
+        super(cause);
+    }
+
+}

Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import java.util.List;
+
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+/**
+ * 
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+
+public class SmooksTransformActionHandler implements ActionHandler {
+
+	public SmooksTransformActionHandler(String name, List<KeyValuePair> properties) {
+		System.out.println("Instantiate action handler: " + name);
+	}
+
+    /* (non-Javadoc)
+     * @see org.jboss.soa.esb.actions.ActionHandler#processAction(java.lang.Object)
+     */
+    public Object processAction(Object payload) {
+        System.out.println("processAction: " + payload);
+        return payload;
+    }
+}

Deleted: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,29 +0,0 @@
-package org.jboss.soa.esb.actions;
-
-import java.io.Serializable;
-
-import org.jboss.soa.esb.helpers.DomElement;
-
-public class TransformAction extends AbstractAction {
-
-	public TransformAction(DomElement actionConfig, Object p_oCurr) {
-		super(actionConfig, p_oCurr);
-		
-		System.out.println(actionConfig.toString());
-	}
-
-	@Override
-	public void processCurrentObject() throws Exception {
-		
-	}
-
-	@Override
-	public Serializable getOkNotification() {
-		return "OK";
-	}
-
-	@Override
-	public Serializable getErrorNotification() {
-		return "Error";
-	}
-}

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -4,7 +4,8 @@
 
 /**
  * Command queue abstraction.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
  */
 public interface CommandQueue {
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -4,7 +4,8 @@
 
 /**
  * Command queue exception.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
  */
 public class CommandQueueException extends BaseException {
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -15,7 +15,8 @@
  * queue name via a "command-queue-name" attribute supplied in the configuration to the
  * {@link #open(DomElement)} method.  The queues are stored statically and can be accessed via the
  * {@link #getQueue(String)} method using the queue name.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
  */
 public class InMemoryCommandQueue implements CommandQueue {
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -28,7 +28,8 @@
  * JMS based Command Queue implementation.
  * <p/>
  * This code was simply pulled from the GpListener.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
  */
 public class JmsCommandQueue implements CommandQueue {
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,161 +1,227 @@
 /*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 
-
 package org.jboss.soa.esb.listeners;
 
 import java.util.*;
-import java.lang.reflect.Constructor;
 
 import org.apache.log4j.*;
 
-import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionHandler;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory.ActionDefinition;
 import org.jboss.soa.esb.helpers.*;
 
-public abstract class AbstractPoller implements Runnable, Observer
-{
-  protected abstract List<Object> pollForCandidates();
-  protected abstract Object		preProcess	(Object p_o) throws Exception;
-	
-  // You can override these values at constructor time of your
-  // derived class after calling super(GpListener,DomElement)
-  protected int	m_iMinPollMillis	= 3000	 // minimum polling interval
-  				,m_iDfltPollMillis	= 20000	 // default polling interval
-  				,m_iSleepForThreads	= 3000	// default sleep if no threads available
-  				,m_iUpperThreadLimit = 10	// just in case - override if you wish 
-  ;
-  
-  public static final String PARM_POLL_LTCY			= "pollLatencySecs";
+public abstract class AbstractPoller implements Runnable {
+    protected abstract List<Object> pollForCandidates();
 
-  protected int 		m_iQthr = 0, m_iMaxThr;
-  protected int			m_iPollMillis;
+    protected abstract Object preProcess(Object p_o) throws Exception;
 
-  protected ThreadGroup m_oThrGrp = null;
-	
-  protected Logger		m_oLogger;
+    // You can override these values at constructor time of your
+    // derived class after calling super(GpListener,DomElement)
+    protected int m_iMinPollMillis = 3000 // minimum polling interval
+            , m_iDfltPollMillis = 20000 // default polling interval
+            , m_iSleepForThreads = 3000 // default sleep if no threads available
+            , m_iUpperThreadLimit = 10 // just in case - override if you wish
+            ;
 
-  protected GpListener	m_oDad;
-  protected DomElement	m_oParms;
-  protected Class 		m_oExecClass;
+    public static final String PARM_POLL_LTCY = "pollLatencySecs";
 
-  protected AbstractPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
-  {
-    m_oLogger	= Logger.getLogger(this.getClass());
-    m_oDad		= p_oDad;
-    m_oParms	= p_oParms.cloneObj();
-    checkParms();
-  } //__________________________________
-/**
- * Check for mandatory and optional attributes in parameter tree
- * 
- * @throws Exception - if actionClass not specified or not in classpath
- *  or invalid int values for maxThreads or pollLatencySecs
- */  
-  protected void checkParms() throws Exception
-  {
-	  String sAtt	= GpListener.obtainAtt(m_oParms
-			  	,GpListener.PARM_ACTION_CLASS,null);
-	  m_oExecClass	= GpListener.checkActionClass(sAtt);
-	  
-	  sAtt			= GpListener.obtainAtt(m_oParms
-			  	,GpListener.PARM_MAX_THREADS,"1");
-	  int iMax		= Integer.parseInt(sAtt);
-	  m_iMaxThr		= Math.min(iMax,m_iUpperThreadLimit);
+    protected int m_iQthr = 0, m_iMaxThr;
 
-	  sAtt			= m_oParms.getAttr(PARM_POLL_LTCY);
-	  m_iPollMillis	= (null==sAtt) ? m_iDfltPollMillis
-	  			: 1000 * Integer.parseInt(sAtt);
-	  if (m_iPollMillis < m_iMinPollMillis)
-		  m_iPollMillis = m_iMinPollMillis;
-  } //________________________________
+    protected int m_iPollMillis;
 
-  /**
-   * Implementation of Observer interface
-   * <p/> Just count the number of active child threads
-   *  
-   */
-      public void update(Observable p_oObs, Object p_oUsrObj)
-      {
-        if (p_oUsrObj instanceof Integer)
-          m_iQthr += ((Integer) p_oUsrObj).intValue();
-      } //________________________________
+    protected ThreadGroup m_oThrGrp = null;
 
-  /**
-   * Implement run method for this Runnable
-   * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
-   * no more looping allowed for all child classes
-   * <p/> This condition will not prevent child processes to finish normally
-   */
-  public void run()
-  {
-	  m_oThrGrp	= new ThreadGroup(m_oParms.getName());
-	  while (m_oDad.continueLooping())
-	  {
-		  List <Object> olPending = pollForCandidates();
-		  if (olPending.size() < 1)
-		  {	try {	Thread.sleep(m_iPollMillis); }
-		  	catch (InterruptedException e) { return; }
-		  	continue;
-		  }
+    protected Logger m_oLogger;
 
-		  for (Object oCurr : olPending)
-		  {
-			  if (m_iQthr >= m_iMaxThr)
-			  {	m_oLogger.info("Waiting for available threads...(max="
-					  +m_iMaxThr+")");
-				try { Thread.sleep(m_iSleepForThreads); }
-				catch (InterruptedException e) {return; }
-				break;
-			  }
+    protected GpListener m_oDad;
 
-			  // give the derived class an opportunity to do something
-			  // before processing current object
-			  Object oProcess = null;
-			  try 
-			  { if (null==(oProcess = preProcess(oCurr)))
-					  continue;
-			  }
-			  catch (Exception ePre)
-			  {	m_oLogger.error("preProcess(Object) FAILED",ePre);
-			  	continue;
-			  }
+    protected DomElement m_oParms;
 
-			  AbstractAction oExec = null;
-			  try
-			  {	Constructor oConst = m_oExecClass
-			  		.getConstructor(GpListener.getActionClassArgs());
-			  	oExec = (AbstractAction)oConst
-			  		.newInstance(new Object[] {m_oParms,oProcess});
-			  }
-			  catch (Exception e)
-			  {	m_oLogger.error("Can't instantiate action class",e);
-				break;
-			  }
-			  // launch an instance of the AbstractAction in a child thread
-			  m_iQthr += 1;
-			  oExec.addObserver(this);
-			  new Thread(oExec).start();
-		  }
-	    }
-  } //__________________________________
- 
-} //____________________________________________________________________________
+    protected String[] m_oActions;
+
+    protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+    protected AbstractPoller(GpListener p_oDad, DomElement p_oParms,
+            ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        m_oLogger = Logger.getLogger(this.getClass());
+        m_oDad = p_oDad;
+        m_oParms = p_oParms.cloneObj();
+        m_oActionDefinitionFactory = actionDefinitionFactory;
+        checkParms();
+    } // __________________________________
+
+    /**
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if actionClass not specified or not in classpath or invalid
+     *             int values for maxThreads or pollLatencySecs
+     */
+    protected void checkParms() throws Exception {
+        String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
+        m_oActions = sAtt.split(",");
+        
+        if(m_oActions.length == 0) {
+            throw new ConfigurationException("Listener 'actions' list must be specified.");
+        }
+
+        sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
+        int iMax = Integer.parseInt(sAtt);
+        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+
+        sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
+        m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer
+                .parseInt(sAtt);
+        if (m_iPollMillis < m_iMinPollMillis)
+            m_iPollMillis = m_iMinPollMillis;
+    } // ________________________________
+
+    /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        m_oThrGrp = new ThreadGroup(m_oParms.getName());
+        while (m_oDad.continueLooping()) {
+            List<Object> olPending = pollForCandidates();
+            if (olPending.size() < 1) {
+                try {
+                    Thread.sleep(m_iPollMillis);
+                } catch (InterruptedException e) {
+                    return;
+                }
+                continue;
+            }
+
+            for (Object oCurr : olPending) {
+                if (m_iQthr >= m_iMaxThr) {
+                    m_oLogger.info("Waiting for available threads...(max="
+                            + m_iMaxThr + ")");
+                    try {
+                        Thread.sleep(m_iSleepForThreads);
+                    } catch (InterruptedException e) {
+                        return;
+                    }
+                    break;
+                }
+
+                // give the derived class an opportunity to do something
+                // before processing current object.
+                Object oProcess = null;
+                try {
+                    // REVIEW: Is this "preProcess" step still required now that we've got chained actions??
+                    if (null == (oProcess = preProcess(oCurr))) {
+                        continue;
+                    }
+                } catch (Exception ePre) {
+                    m_oLogger.error("preProcess(Object) FAILED", ePre);
+                    continue;
+                }
+
+                ActionExecutionRunner runner = new ActionExecutionRunner(oProcess);
+                new Thread(runner).start();
+            }
+        }
+    } // __________________________________
+
+    
+    /**
+     * Action Execution Runner.
+     * <p/>
+     * Runs the actions in a listeners "actions" config in a message payload object received
+     * by the listener implementation.
+     * <p/>
+     * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
+     * overall cleanup of these classes.  Lots of duplicate code etc. 
+     * 
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+    private class ActionExecutionRunner implements Runnable {
+        
+        private Object object;
+             
+        /**
+         * Private constructor.
+         * @param initialObject The inital processing target object.
+         */
+        private ActionExecutionRunner(Object initialObject) {
+            this.object = initialObject;
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            String currentAction = null;
+            
+            // Increment the active thread count for the listener on starting...
+            incThreads();
+            
+            try {
+                // Run the object through each ActionHandler...
+                for(String action : m_oActions) {
+                    ActionDefinition actionDefinition;
+
+                    currentAction = action.trim();
+                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+                    if(actionDefinition == null) {
+                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
+                    }
+                    
+                    // The processing result of each action feeds into the processing of the next action...
+                    ActionHandler handler = actionDefinition.getHandler();
+                    object = handler.processAction(object);
+                    if(object == null) {
+                        m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  ActionHandler [" + handler.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+                        break;
+                    }
+                }
+            } catch(Throwable thrown) {
+                m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
+            }
+            
+            // Decrement the active thread count for the listener on completion...
+            decThreads();
+        }
+    }
+    
+} // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -29,6 +29,7 @@
 
 import org.jboss.soa.esb.util.*;
 import org.jboss.soa.esb.actions.AbstractFileAction;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
 import org.jboss.soa.esb.helpers.*;
 
 public class DirectoryPoller extends AbstractPoller
@@ -42,9 +43,9 @@
   public static final String FILE_POST_SFX  	= "postSuffix";
   public static final String FILE_POST_DEL  	= "postDelete";
 
-  public DirectoryPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+  public DirectoryPoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
   {
-	super(p_oDad,p_oParms);
+	super(p_oDad, p_oParms, actionDefinitionFactory);
 	checkMyParms();
   } //__________________________________
 

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -33,7 +33,9 @@
 import java.util.Map;
 
 import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
 import org.jboss.soa.esb.command.CommandQueue;
 import org.jboss.soa.esb.command.CommandQueueException;
 import org.jboss.soa.esb.command.JmsCommandQueue;
@@ -99,7 +101,7 @@
 	// DomElements
 	public static final String PARM_LISTENER_CLASS = "listenerClass";
 
-	public static final String PARM_ACTION_CLASS = "actionClass";
+	public static final String PARM_ACTIONS = "actions";
 
 	public static final String PARM_MAX_THREADS = "maxThreads";
 
@@ -160,6 +162,8 @@
 
 	private CommandQueue commandQueue;
 
+    private ActionDefinitionFactory actionDefinitionFactory;
+
 	/**
 	 * Construct a Listener Manager from the named repository based
 	 * configuration.
@@ -194,9 +198,7 @@
 
 			m_oState = State.Exception_thrown;
 			m_oState.m_oException = e;
-			m_oLogger
-					.fatal(
-							"Listener configuration and startup error.  Config Source: "
+			m_oLogger.fatal("Listener configuration and startup error.  Config Source: "
 									+ (configSource != null ? configSource
 											: "unknown"), e);
 
@@ -265,8 +267,20 @@
 		m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
 				sEndT).getTime();
 
+        // Read and initialise the action definitions...
+        DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+        if(actionConfig == null) {
+            throw new ConfigurationException("No 'Actions' configuration.");
+        }        
+        actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
+        
 	} // ________________________________
 
+    /**
+     * Factory method for creating the command queue.
+     * @param config GpListener config.
+     * @return GpListener CommandQueue instance.
+     */
 	private CommandQueue createCommandQueue(DomElement config) {
 		String commandQueueClass = config.getAttr("command-queue-class");
 		
@@ -342,9 +356,9 @@
 		try {
 			Class oListener = Class.forName(p_sClassName);
 			Constructor oConst = oListener.getConstructor(new Class[] {
-					this.getClass(), DomElement.class });
+					this.getClass(), DomElement.class, ActionDefinitionFactory.class });
 			Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
-					p_oP });
+					p_oP, actionDefinitionFactory });
 			new Thread(oRun).start();
 		} catch (Exception e) {
 			m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,196 +1,269 @@
 /*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
 
 package org.jboss.soa.esb.listeners;
 
-import java.lang.reflect.*;
-import java.util.Observer;
-import java.util.Observable;
+import java.util.Arrays;
 
 import org.apache.log4j.*;
 
 import javax.naming.*;
 import javax.jms.*;
 
-import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionHandler;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory.ActionDefinition;
 import org.jboss.soa.esb.helpers.*;
 
-public class JmsQueueListener implements Runnable, Observer
-{  
-  // You can override these values at constructor time of your derived class
-  protected  int
-  			m_iSleepForThreads	= 3000	// default sleep if no threads available
-			,m_iUpperThreadLimit = 10	// just in case - override if you wish 
-  ;
-  public static final String LISTEN_QUEUE_CONN_FACT	= "queueConnFactoryClass";
-  public static final String LISTEN_JNDI_TYPE		= "listenJndiType";
-  public static final String LISTEN_JNDI_URL 		= "listenJndiURL";
-  public static final String LISTEN_QUEUE			= "listenQueue";
-  public static final String LISTEN_MSG_SELECTOR	= "listenMsgSelector";
+public class JmsQueueListener implements Runnable {
+    // You can override these values at constructor time of your derived class
+    protected int m_iSleepForThreads = 3000 // default sleep if no threads
+                                            // available
+            , m_iUpperThreadLimit = 10 // just in case - override if you wish
+            ;
 
-  protected boolean 		m_bError = false;
+    public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
 
-  protected QueueConnection m_oQconn;
-  protected QueueSession	m_oQsess;
-  protected Queue			m_oQueue;
-  protected String			m_sSelector;
-  protected MessageConsumer m_oRdr;
+    public static final String LISTEN_JNDI_TYPE = "listenJndiType";
 
+    public static final String LISTEN_JNDI_URL = "listenJndiURL";
 
-  protected int 		m_iQthr = 0, m_iMaxThr;
+    public static final String LISTEN_QUEUE = "listenQueue";
 
-  protected ThreadGroup m_oThrGrp = null;
-	
-  protected Logger		m_oLogger;
+    public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
 
-  protected GpListener	m_oDad;
-  protected DomElement	m_oParms;
-  protected Class 		m_oExecClass;
+    protected boolean m_bError = false;
 
-  public JmsQueueListener(GpListener p_oDad, DomElement p_oParms) throws Exception
-  {
-	    m_oLogger	= Logger.getLogger(this.getClass());
-	    m_oDad		= p_oDad;
-	    m_oParms	= p_oParms.cloneObj();
-	    checkMyParms();
-	    m_oThrGrp	= new ThreadGroup(m_oParms.getName());
-  } //__________________________________
-  
-  /**
-   * Check for mandatory and optional attributes in parameter tree
-   * 
-   * @throws Exception - if mandatory atts are not right
-   * 			or actionClass not in classpath 
-   */  
-	protected void checkMyParms() throws Exception
-	{
-		String sAtt	= GpListener.obtainAtt(m_oParms
-				,GpListener.PARM_ACTION_CLASS,null);
-		m_oExecClass= GpListener.checkActionClass(sAtt);
-		  
-		sAtt		= GpListener.obtainAtt(m_oParms
-				  	,GpListener.PARM_MAX_THREADS,"1");
-		int iMax	= Integer.parseInt(sAtt);
-		m_iMaxThr	= Math.min(iMax,m_iUpperThreadLimit);
+    protected QueueConnection m_oQconn;
 
-		// Third arg is null - Exception will br thrown if listenQueue is not found
-		String sQueue = GpListener.obtainAtt(m_oParms,LISTEN_QUEUE,null);
-		
-		// No problem if selector is null - everything in queue will be returned
-		m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+    protected QueueSession m_oQsess;
 
-		m_oQconn		= null;
-		m_oQsess		= null;
-		m_oQueue		= null;
-	      
-		String sJndiType = GpListener.obtainAtt(m_oParms
-				,LISTEN_JNDI_TYPE,"jboss");
-		String sJndiURL	 = GpListener.obtainAtt(m_oParms
-				,LISTEN_JNDI_URL,"localhost");
-		Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+    protected Queue m_oQueue;
 
-		String sFactClass	= GpListener.obtainAtt(m_oParms
-				,LISTEN_QUEUE_CONN_FACT,"ConnectionFactory");
-		Object tmp = oJndiCtx.lookup(sFactClass);
-		QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+    protected String m_sSelector;
 
-		m_oQconn = qcf.createQueueConnection();
-		m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
-		m_oQsess = m_oQconn.createQueueSession
-			(false,TopicSession.AUTO_ACKNOWLEDGE);
-		m_oQconn.start();
-		m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
-		
-	} //________________________________
+    protected MessageConsumer m_oRdr;
 
+    protected int m_iQthr = 0, m_iMaxThr;
+
+    protected ThreadGroup m_oThrGrp = null;
+
+    protected Logger m_oLogger;
+
+    protected GpListener m_oDad;
+
+    protected DomElement m_oParms;
+
+    protected String[] m_oActions;
+
+    protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+    public JmsQueueListener(GpListener p_oDad, DomElement p_oParms,
+            ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+        m_oLogger = Logger.getLogger(this.getClass());
+        m_oDad = p_oDad;
+        m_oParms = p_oParms.cloneObj();
+        m_oActionDefinitionFactory = actionDefinitionFactory;
+        checkMyParms();
+        m_oThrGrp = new ThreadGroup(m_oParms.getName());
+    } // __________________________________
+
     /**
-     * Implement run method for this Runnable
-     * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
-     * no more looping allowed for all child classes
-     * <p/> This condition will not prevent child processes to finish normally
+     * Check for mandatory and optional attributes in parameter tree
+     * 
+     * @throws Exception -
+     *             if mandatory atts are not right or actionClass not in
+     *             classpath
      */
-	public void run()
-	{
-		while (m_oDad.continueLooping())
-		{
-			if (m_iQthr >= m_iMaxThr)
-			{	m_oLogger.info("Waiting for available threads...");
-				try { Thread.sleep(m_iSleepForThreads); }
-				catch (InterruptedException e) {return; }
-				break;
-			 }
-			 Message oM = null;
-			 try { oM = m_oRdr.receive(m_oDad.millisToWait()); }
-			 catch (JMSException oJ)
-			 {
-				 m_oLogger.error("JMS error on receive",oJ);
-				 for (int i1=0; i1<3; i1++)
-					 try {checkMyParms(); }  // try to reconnect to the queue
-					 catch (Exception e)
-					 {	m_oLogger.error("Reconnecting to Queue",e);
-					 	try { Thread.sleep(m_iSleepForThreads); }
-					 	catch (InterruptedException e1) 
-					 	{ //Just return
-					 		return;
-						}
-					 }
-			 }
-			 if (null==oM)
-				 continue;
+    protected void checkMyParms() throws Exception {
+        String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
+        m_oActions = sAtt.split(",");
+        
+        if(m_oActions.length == 0) {
+            throw new ConfigurationException("Listener 'actions' list must be specified.");
+        }
 
-			 AbstractAction oExec = null;
-			 try
-			 {	Constructor oConst = m_oExecClass
-				 	.getConstructor(GpListener.getActionClassArgs());
-			  	oExec = (AbstractAction)oConst.newInstance
-			  		(new Object[] {m_oParms,oM});
-			 }
-			 catch (Exception e)
-			 {	m_oLogger.error("Can't instantiate action class",e);
-			 	break;
-			 }
-			 // invoke the run method of the AbstractAction
-			 m_iQthr += 1;
-			 oExec.addObserver(this);
-			 new Thread(oExec).start();
-		 }
-		if (null!=m_oQsess)
-		      try { m_oQsess.close(); }    
-			catch (Exception e1) {/* Tried my best - Just continue */}
-		if (null!=m_oQconn)
-		      try { m_oQconn.close(); }    
-			catch (Exception e2) {/* Tried my best - Just continue */}
-	  } //______________________________
-	  
-/**
- * Implementation of Observer interface
- * <p/> Just count the number of active child threads
- *  
- */
-	public void update(Observable p_oObs, Object p_oUsrObj)
-	{
-		if (p_oUsrObj instanceof Integer)
-			m_iQthr += ((Integer) p_oUsrObj).intValue();
-	} //________________________________
+        sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
+        int iMax = Integer.parseInt(sAtt);
+        m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
 
-  
-} //____________________________________________________________________________
+        // Third arg is null - Exception will br thrown if listenQueue is not
+        // found
+        String sQueue = GpListener.obtainAtt(m_oParms, LISTEN_QUEUE, null);
+
+        // No problem if selector is null - everything in queue will be returned
+        m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+
+        m_oQconn = null;
+        m_oQsess = null;
+        m_oQueue = null;
+
+        String sJndiType = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_TYPE,
+                "jboss");
+        String sJndiURL = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_URL,
+                "localhost");
+        Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+                sJndiURL);
+
+        String sFactClass = GpListener.obtainAtt(m_oParms,
+                LISTEN_QUEUE_CONN_FACT, "ConnectionFactory");
+        Object tmp = oJndiCtx.lookup(sFactClass);
+        QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+        m_oQconn = qcf.createQueueConnection();
+        m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+        m_oQsess = m_oQconn.createQueueSession(false,
+                TopicSession.AUTO_ACKNOWLEDGE);
+        m_oQconn.start();
+        m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
+
+    } // ________________________________
+
+    /**
+     * Implement run method for this Runnable <p/> Will continue to run until
+     * controlling class (ref in m_oDad) indicates no more looping allowed for
+     * all child classes <p/> This condition will not prevent child processes to
+     * finish normally
+     */
+    public void run() {
+        while (m_oDad.continueLooping()) {
+            if (m_iQthr >= m_iMaxThr) {
+                m_oLogger.info("Waiting for available threads...");
+                try {
+                    Thread.sleep(m_iSleepForThreads);
+                } catch (InterruptedException e) {
+                    return;
+                }
+                break;
+            }
+            Message oM = null;
+            try {
+                oM = m_oRdr.receive(m_oDad.millisToWait());
+            } catch (JMSException oJ) {
+                m_oLogger.error("JMS error on receive", oJ);
+                for (int i1 = 0; i1 < 3; i1++)
+                    try {
+                        checkMyParms();
+                    } // try to reconnect to the queue
+                    catch (Exception e) {
+                        m_oLogger.error("Reconnecting to Queue", e);
+                        try {
+                            Thread.sleep(m_iSleepForThreads);
+                        } catch (InterruptedException e1) { // Just return
+                            return;
+                        }
+                    }
+            }
+            if (null == oM)
+                continue;
+
+            ActionExecutionRunner runner = new ActionExecutionRunner(oM);
+            new Thread(runner).start();
+        }
+        if (null != m_oQsess)
+            try {
+                m_oQsess.close();
+            } catch (Exception e1) {/* Tried my best - Just continue */
+            }
+        if (null != m_oQconn)
+            try {
+                m_oQconn.close();
+            } catch (Exception e2) {/* Tried my best - Just continue */
+            }
+    } // ______________________________
+
+    /**
+     * Increment the active thread count.
+     */
+    private void incThreads() {
+        m_iQthr++;
+    }
+
+    /**
+     * Decrement the active thread count.
+     */
+    private void decThreads() {
+        m_iQthr--;
+    }
+
+    /**
+     * Action Execution Runner.
+     * <p/>
+     * Runs the actions in a listeners "actions" config in a message payload object received
+     * by the listener implementation.
+     * <p/>
+     * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes.  Needs to be sorted out as an
+     * overall cleanup of these classes.  Lots of duplicate code etc. 
+     * 
+     * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+     * @since Version 4.0
+     */
+    private class ActionExecutionRunner implements Runnable {
+        
+        private Object object;
+             
+        /**
+         * Private constructor.
+         * @param initialObject The inital processing target object.
+         */
+        private ActionExecutionRunner(Object initialObject) {
+            this.object = initialObject;
+        }
+
+        /* (non-Javadoc)
+         * @see java.lang.Runnable#run()
+         */
+        public void run() {
+            String currentAction = null;
+            
+            // Increment the active thread count for the listener on starting...
+            incThreads();
+            
+            try {
+                // Run the object through each ActionHandler...
+                for(String action : m_oActions) {
+                    ActionDefinition actionDefinition;
+
+                    currentAction = action.trim();
+                    actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+                    if(actionDefinition == null) {
+                        throw new java.lang.IllegalStateException("Bad Listener Configuration.  No 'Actions/Action' definition for action [" + currentAction + "].");
+                    }
+                    
+                    // The processing result of each action feeds into the processing of the next action...
+                    ActionHandler handler = actionDefinition.getHandler();
+                    object = handler.processAction(object);
+                    if(object == null) {
+                        m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  ActionHandler [" + handler.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+                        break;
+                    }
+                }
+            } catch(Throwable thrown) {
+                m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "].  Action [" + currentAction + "] thre an exception.", thrown);
+            }
+            
+            // Decrement the active thread count for the listener on completion...
+            decThreads();
+        }
+    }
+} // ____________________________________________________________________________

Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java	2006-09-11 18:32:57 UTC (rev 6150)
@@ -155,9 +155,9 @@
   * @param p_oParms DomElement - Sub tree that corresponds to this instance
   * @throws Exception
   */
-  public SqlTablePoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+  public SqlTablePoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
   {
-	super(p_oDad,p_oParms);
+	super(p_oDad, p_oParms, actionDefinitionFactory);
 	try { checkMyParms(); }
 	catch (Exception e)
 	{




More information about the jboss-svn-commits mailing list