[jboss-svn-commits] JBL Code SVN: r6150 - in labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb: actions command listeners
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Sep 11 14:33:08 EDT 2006
Author: tfennelly
Date: 2006-09-11 14:32:57 -0400 (Mon, 11 Sep 2006)
New Revision: 6150
Added:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java
Removed:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
Modified:
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
Log:
Chained actions stuff
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,276 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+/**
+ * Action Definition Factory.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionDefinitionFactory {
+
+ /**
+ * Class Logger.
+ */
+ private static Logger logger = Logger.getLogger(ActionDefinitionFactory.class);
+ /**
+ * Action Definition table.
+ */
+ private Hashtable<String, ActionDefinition> actionDefinitions = new Hashtable<String, ActionDefinition>();
+
+ /**
+ * Construct a factory instance.
+ * @param config ActionDefinition configuration.
+ * @throws ConfigurationException Invalid configuration.
+ */
+ public ActionDefinitionFactory(DomElement config) throws ConfigurationException {
+ if(config == null || !config.getName().equals("Actions")) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+
+ // Read the handler alias and action def configurations from the XML configuration...
+ DomElement handlerAliasConfig = config.getFirstElementChild("HandlerAliases");
+ DomElement[] actions = config.getElementChildren("Action");
+ if(handlerAliasConfig == null) {
+ throw new ConfigurationException("No 'Actions/HandlerAliases' configuration.");
+ }
+ if(actions == null || actions.length == 0) {
+ throw new ConfigurationException("No 'Actions/Action' configurations.");
+ }
+
+ // Initialise the Action Definition table..
+ Hashtable<String, String> handlerClasses = getHandlerClasses(handlerAliasConfig);
+ initialiseActionDefinitions(actions, handlerClasses);
+ logger.info("ActionDefinition Factory initialisation complete.");
+ }
+
+ /**
+ * Get the action definition for the specified action name.
+ * @param actionName The action name.
+ * @return The action definition.
+ */
+ public ActionDefinition getInstance(String actionName) {
+ if(actionName == null || actionName.trim().equals("")) {
+ throw new IllegalArgumentException("Null or empty 'actionName' arg in method call.");
+ }
+
+ return actionDefinitions.get(actionName);
+ }
+
+ /**
+ * Initialise the Action Definitions.
+ * @param actions Action Definition configurations.
+ * @param handlerClasses Handler classes keyed by alias name.
+ * @throws ConfigurationException
+ */
+ private void initialiseActionDefinitions(DomElement[] actions, Hashtable<String, String> handlerClasses) throws ConfigurationException {
+ if(handlerClasses == null || handlerClasses.isEmpty()) {
+ throw new ConfigurationException("No action handler classes defined.");
+ }
+
+ for(DomElement action : actions) {
+ String name = action.getAttr("name");
+ String handlerAlias = action.getAttr("handler");
+ String handlerClass;
+ List<KeyValuePair> properties;
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action has no 'name' defined.");
+ }
+ if(handlerAlias == null || handlerAlias.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action [" + name + "] has no 'handler' defined.");
+ }
+ handlerClass = handlerClasses.get(handlerAlias);
+ if(handlerClass == null) {
+ throw new ConfigurationException("No action handler class defined for handler alias [" + handlerAlias + "] set on action [" + name + "].");
+ }
+
+ // Get any properties defined on the action...
+ properties = getProperties(action);
+
+ // Create the action definition and store it against it's name...
+ actionDefinitions.put(name, new ActionDefinition(name, handlerClass, properties));
+ logger.info("Added ActionDefinition [" + name + "] for handler [" + handlerAlias + ":" + handlerClass + "]. Num properties: " + properties.size());
+ }
+ }
+
+ /**
+ * Get the handler runtime classes.
+ * @param handlerAliasConfig Alias configs.
+ * @return Handler runtime classs
+ * @throws ConfigurationException Bad configuration.
+ */
+ private Hashtable<String, String> getHandlerClasses(DomElement handlerAliasConfig) throws ConfigurationException {
+ DomElement[] aliases = handlerAliasConfig.getElementChildren("Alias");
+ Hashtable<String, String> handlerClasses = new Hashtable<String, String>();
+
+ if(aliases == null) {
+ throw new ConfigurationException("No 'Actions/HandlerAliases/Alias' configurations.");
+ }
+
+ for(DomElement alias : aliases) {
+ String name = alias.getAttr("name");
+ String className = alias.getAttr("class");
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/HandlerAliases/Alias has no 'name' defined.");
+ }
+ if(className == null || className.trim().equals("")) {
+ throw new ConfigurationException("Actions/HandlerAliases/Alias [" + name + "] has no 'class' defined.");
+ }
+ handlerClasses.put(name, className);
+ logger.info("Added alias [" + name + "] for ActionHandler class [" + className + "].");
+ }
+
+ return handlerClasses;
+ }
+
+ /**
+ * Get the properties defined on the supplied 'Action' configuration.
+ * @param action The action configuration element.
+ * @return A list of property {@link KeyValuePair} instances. An empty list list is returned
+ * if no properties are defined on the action.
+ * @throws ConfigurationException A property name or value is empty or undefined.
+ */
+ private List<KeyValuePair> getProperties(DomElement action) throws ConfigurationException {
+ DomElement[] properties = action.getElementChildren("property");
+ List<KeyValuePair> propertyList = new ArrayList<KeyValuePair>();
+
+ for(DomElement property : properties) {
+ String name = property.getAttr("name");
+ String value = property.getAttr("value");
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action/property has no 'name' defined. Action [" + action.getAttr("name") + "].");
+ }
+ if(value == null || value.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action/property has no 'value' defined. Action [" + action.getAttr("name") + "], Property [" + name + "].");
+ }
+
+ propertyList.add(new KeyValuePair(name, value));
+ }
+
+ return propertyList;
+ }
+
+ /**
+ * Action Definition.
+ * <p/>
+ * An Action is defined by name, a handler instance to carry out the action, plus properties to the action
+ * to be supplued to the handler instance.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ public class ActionDefinition {
+
+ private String name;
+ private String handler;
+ private List<KeyValuePair> properties;
+
+ /**
+ * Private constructor.
+ * @param name The action name.
+ * @param handler The action handler runtime class.
+ * @param properties Action properties list. An empty list for an action with no defined properties.
+ */
+ private ActionDefinition(String name, String handler, List<KeyValuePair> properties) {
+ this.name = name;
+ this.handler = handler;
+ this.properties = properties;
+ }
+
+ /**
+ * Get the action name.
+ * @return The action name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the action handler instance.
+ * @return The Action Handler.
+ */
+ public ActionHandler getHandler() {
+ // TODO: Support singleton ActionHandler instances. This impl currently only supports prototype.
+ return createActionHandler();
+ }
+
+ /**
+ * Get the action properties to be supplied to the action handler.
+ * @return The configured action properties. An empty list for an action with no defined properties.
+ */
+ public List<KeyValuePair> getProperties() {
+ return properties;
+ }
+
+ /**
+ * Create the handler instance.
+ * Handler instance.
+ * @return
+ */
+ private ActionHandler createActionHandler() {
+ Class<? extends ActionHandler> runtimeClass;
+ Class[] NONDEFAULT_SIG = new Class[] {String.class, List.class};
+ Constructor nonDefaultConstructor;
+
+ // Get the runtime class...
+ try {
+ runtimeClass = Class.forName(handler).asSubclass(ActionHandler.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Action Handler class " + handler + " not found in classpath.", e);
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Action Handler class " + handler + " does not implement " + ActionHandler.class.getName(), e);
+ }
+
+ // Construct it...
+ try {
+ nonDefaultConstructor = runtimeClass.getConstructor(NONDEFAULT_SIG);
+ return (ActionHandler) nonDefaultConstructor.newInstance(new Object[] {name, properties});
+ } catch (NoSuchMethodException e1) {
+ try {
+ return runtimeClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("No appropriate constructor found on handler class [" + handler + "]. See " + ActionHandler.class.getName() + " Javadoc.", e);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unexpected exception. Unable to construct handler class instance [" + handler + "] using default constructor.", e);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Unexpected exception. Unable to construct handler class instance [" + handler + "] using non-default constructor.", e);
+ }
+ }
+ }
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionHandler.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+/**
+ * Action Handler Interface Definition.
+ * <p/>
+ * Implementations are constructed based on the following public constructor order precedence:
+ * <ol>
+ * <li><b>(String actionName, List<{@link org.jboss.soa.esb.helpers.KeyValuePair}> properties)</b></li>
+ * <li><b>default constructor</b>.</li>
+ * </ol>
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface ActionHandler {
+
+ /**
+ * Process the Action "payload" and return the result.
+ * @param payload The action payload to be processed.
+ * @return The processing result.
+ * @throws ActionProcessingException Exception during payload processing.
+ */
+ public Object processAction(Object payload) throws ActionProcessingException;
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Exception while processing message payload processing action.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionProcessingException extends BaseException {
+
+ /**
+ * Public constructor.
+ * @param message Exception message.
+ */
+ public ActionProcessingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Public constructor.
+ * @param message Exception message.
+ * @param cause Exception cause.
+ */
+ public ActionProcessingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Public constructor.
+ * @param cause Exception cause.
+ */
+ public ActionProcessingException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/SmooksTransformActionHandler.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import java.util.List;
+
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+/**
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+
+public class SmooksTransformActionHandler implements ActionHandler {
+
+ public SmooksTransformActionHandler(String name, List<KeyValuePair> properties) {
+ System.out.println("Instantiate action handler: " + name);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.actions.ActionHandler#processAction(java.lang.Object)
+ */
+ public Object processAction(Object payload) {
+ System.out.println("processAction: " + payload);
+ return payload;
+ }
+}
Deleted: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/actions/TransformAction.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,29 +0,0 @@
-package org.jboss.soa.esb.actions;
-
-import java.io.Serializable;
-
-import org.jboss.soa.esb.helpers.DomElement;
-
-public class TransformAction extends AbstractAction {
-
- public TransformAction(DomElement actionConfig, Object p_oCurr) {
- super(actionConfig, p_oCurr);
-
- System.out.println(actionConfig.toString());
- }
-
- @Override
- public void processCurrentObject() throws Exception {
-
- }
-
- @Override
- public Serializable getOkNotification() {
- return "OK";
- }
-
- @Override
- public Serializable getErrorNotification() {
- return "Error";
- }
-}
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -4,7 +4,8 @@
/**
* Command queue abstraction.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
*/
public interface CommandQueue {
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -4,7 +4,8 @@
/**
* Command queue exception.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
*/
public class CommandQueueException extends BaseException {
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -15,7 +15,8 @@
* queue name via a "command-queue-name" attribute supplied in the configuration to the
* {@link #open(DomElement)} method. The queues are stored statically and can be accessed via the
* {@link #getQueue(String)} method using the queue name.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
*/
public class InMemoryCommandQueue implements CommandQueue {
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -28,7 +28,8 @@
* JMS based Command Queue implementation.
* <p/>
* This code was simply pulled from the GpListener.
- * @author tfennelly
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
*/
public class JmsCommandQueue implements CommandQueue {
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,161 +1,227 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
-
package org.jboss.soa.esb.listeners;
import java.util.*;
-import java.lang.reflect.Constructor;
import org.apache.log4j.*;
-import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionHandler;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory.ActionDefinition;
import org.jboss.soa.esb.helpers.*;
-public abstract class AbstractPoller implements Runnable, Observer
-{
- protected abstract List<Object> pollForCandidates();
- protected abstract Object preProcess (Object p_o) throws Exception;
-
- // You can override these values at constructor time of your
- // derived class after calling super(GpListener,DomElement)
- protected int m_iMinPollMillis = 3000 // minimum polling interval
- ,m_iDfltPollMillis = 20000 // default polling interval
- ,m_iSleepForThreads = 3000 // default sleep if no threads available
- ,m_iUpperThreadLimit = 10 // just in case - override if you wish
- ;
-
- public static final String PARM_POLL_LTCY = "pollLatencySecs";
+public abstract class AbstractPoller implements Runnable {
+ protected abstract List<Object> pollForCandidates();
- protected int m_iQthr = 0, m_iMaxThr;
- protected int m_iPollMillis;
+ protected abstract Object preProcess(Object p_o) throws Exception;
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
+ protected int m_iMinPollMillis = 3000 // minimum polling interval
+ , m_iDfltPollMillis = 20000 // default polling interval
+ , m_iSleepForThreads = 3000 // default sleep if no threads available
+ , m_iUpperThreadLimit = 10 // just in case - override if you wish
+ ;
- protected GpListener m_oDad;
- protected DomElement m_oParms;
- protected Class m_oExecClass;
+ public static final String PARM_POLL_LTCY = "pollLatencySecs";
- protected AbstractPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
- {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- checkParms();
- } //__________________________________
-/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception - if actionClass not specified or not in classpath
- * or invalid int values for maxThreads or pollLatencySecs
- */
- protected void checkParms() throws Exception
- {
- String sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_ACTION_CLASS,null);
- m_oExecClass = GpListener.checkActionClass(sAtt);
-
- sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_MAX_THREADS,"1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
+ protected int m_iQthr = 0, m_iMaxThr;
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- m_iPollMillis = (null==sAtt) ? m_iDfltPollMillis
- : 1000 * Integer.parseInt(sAtt);
- if (m_iPollMillis < m_iMinPollMillis)
- m_iPollMillis = m_iMinPollMillis;
- } //________________________________
+ protected int m_iPollMillis;
- /**
- * Implementation of Observer interface
- * <p/> Just count the number of active child threads
- *
- */
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- m_iQthr += ((Integer) p_oUsrObj).intValue();
- } //________________________________
+ protected ThreadGroup m_oThrGrp = null;
- /**
- * Implement run method for this Runnable
- * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
- * no more looping allowed for all child classes
- * <p/> This condition will not prevent child processes to finish normally
- */
- public void run()
- {
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
- while (m_oDad.continueLooping())
- {
- List <Object> olPending = pollForCandidates();
- if (olPending.size() < 1)
- { try { Thread.sleep(m_iPollMillis); }
- catch (InterruptedException e) { return; }
- continue;
- }
+ protected Logger m_oLogger;
- for (Object oCurr : olPending)
- {
- if (m_iQthr >= m_iMaxThr)
- { m_oLogger.info("Waiting for available threads...(max="
- +m_iMaxThr+")");
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e) {return; }
- break;
- }
+ protected GpListener m_oDad;
- // give the derived class an opportunity to do something
- // before processing current object
- Object oProcess = null;
- try
- { if (null==(oProcess = preProcess(oCurr)))
- continue;
- }
- catch (Exception ePre)
- { m_oLogger.error("preProcess(Object) FAILED",ePre);
- continue;
- }
+ protected DomElement m_oParms;
- AbstractAction oExec = null;
- try
- { Constructor oConst = m_oExecClass
- .getConstructor(GpListener.getActionClassArgs());
- oExec = (AbstractAction)oConst
- .newInstance(new Object[] {m_oParms,oProcess});
- }
- catch (Exception e)
- { m_oLogger.error("Can't instantiate action class",e);
- break;
- }
- // launch an instance of the AbstractAction in a child thread
- m_iQthr += 1;
- oExec.addObserver(this);
- new Thread(oExec).start();
- }
- }
- } //__________________________________
-
-} //____________________________________________________________________________
+ protected String[] m_oActions;
+
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+ protected AbstractPoller(GpListener p_oDad, DomElement p_oParms,
+ ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ m_oLogger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ m_oParms = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ checkParms();
+ } // __________________________________
+
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if actionClass not specified or not in classpath or invalid
+ * int values for maxThreads or pollLatencySecs
+ */
+ protected void checkParms() throws Exception {
+ String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
+ m_oActions = sAtt.split(",");
+
+ if(m_oActions.length == 0) {
+ throw new ConfigurationException("Listener 'actions' list must be specified.");
+ }
+
+ sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+
+ sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
+ m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer
+ .parseInt(sAtt);
+ if (m_iPollMillis < m_iMinPollMillis)
+ m_iPollMillis = m_iMinPollMillis;
+ } // ________________________________
+
+ /**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ m_oThrGrp = new ThreadGroup(m_oParms.getName());
+ while (m_oDad.continueLooping()) {
+ List<Object> olPending = pollForCandidates();
+ if (olPending.size() < 1) {
+ try {
+ Thread.sleep(m_iPollMillis);
+ } catch (InterruptedException e) {
+ return;
+ }
+ continue;
+ }
+
+ for (Object oCurr : olPending) {
+ if (m_iQthr >= m_iMaxThr) {
+ m_oLogger.info("Waiting for available threads...(max="
+ + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
+ // give the derived class an opportunity to do something
+ // before processing current object.
+ Object oProcess = null;
+ try {
+ // REVIEW: Is this "preProcess" step still required now that we've got chained actions??
+ if (null == (oProcess = preProcess(oCurr))) {
+ continue;
+ }
+ } catch (Exception ePre) {
+ m_oLogger.error("preProcess(Object) FAILED", ePre);
+ continue;
+ }
+
+ ActionExecutionRunner runner = new ActionExecutionRunner(oProcess);
+ new Thread(runner).start();
+ }
+ }
+ } // __________________________________
+
+
+ /**
+ * Action Execution Runner.
+ * <p/>
+ * Runs the actions in a listeners "actions" config in a message payload object received
+ * by the listener implementation.
+ * <p/>
+ * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
+ * overall cleanup of these classes. Lots of duplicate code etc.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ private class ActionExecutionRunner implements Runnable {
+
+ private Object object;
+
+ /**
+ * Private constructor.
+ * @param initialObject The inital processing target object.
+ */
+ private ActionExecutionRunner(Object initialObject) {
+ this.object = initialObject;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ String currentAction = null;
+
+ // Increment the active thread count for the listener on starting...
+ incThreads();
+
+ try {
+ // Run the object through each ActionHandler...
+ for(String action : m_oActions) {
+ ActionDefinition actionDefinition;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+ if(actionDefinition == null) {
+ throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the processing of the next action...
+ ActionHandler handler = actionDefinition.getHandler();
+ object = handler.processAction(object);
+ if(object == null) {
+ m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. ActionHandler [" + handler.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+ break;
+ }
+ }
+ } catch(Throwable thrown) {
+ m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
+ }
+
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
+ }
+
+} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -29,6 +29,7 @@
import org.jboss.soa.esb.util.*;
import org.jboss.soa.esb.actions.AbstractFileAction;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
import org.jboss.soa.esb.helpers.*;
public class DirectoryPoller extends AbstractPoller
@@ -42,9 +43,9 @@
public static final String FILE_POST_SFX = "postSuffix";
public static final String FILE_POST_DEL = "postDelete";
- public DirectoryPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+ public DirectoryPoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
{
- super(p_oDad,p_oParms);
+ super(p_oDad, p_oParms, actionDefinitionFactory);
checkMyParms();
} //__________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -33,7 +33,9 @@
import java.util.Map;
import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
import org.jboss.soa.esb.command.CommandQueue;
import org.jboss.soa.esb.command.CommandQueueException;
import org.jboss.soa.esb.command.JmsCommandQueue;
@@ -99,7 +101,7 @@
// DomElements
public static final String PARM_LISTENER_CLASS = "listenerClass";
- public static final String PARM_ACTION_CLASS = "actionClass";
+ public static final String PARM_ACTIONS = "actions";
public static final String PARM_MAX_THREADS = "maxThreads";
@@ -160,6 +162,8 @@
private CommandQueue commandQueue;
+ private ActionDefinitionFactory actionDefinitionFactory;
+
/**
* Construct a Listener Manager from the named repository based
* configuration.
@@ -194,9 +198,7 @@
m_oState = State.Exception_thrown;
m_oState.m_oException = e;
- m_oLogger
- .fatal(
- "Listener configuration and startup error. Config Source: "
+ m_oLogger.fatal("Listener configuration and startup error. Config Source: "
+ (configSource != null ? configSource
: "unknown"), e);
@@ -265,8 +267,20 @@
m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
sEndT).getTime();
+ // Read and initialise the action definitions...
+ DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+ if(actionConfig == null) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+ actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
+
} // ________________________________
+ /**
+ * Factory method for creating the command queue.
+ * @param config GpListener config.
+ * @return GpListener CommandQueue instance.
+ */
private CommandQueue createCommandQueue(DomElement config) {
String commandQueueClass = config.getAttr("command-queue-class");
@@ -342,9 +356,9 @@
try {
Class oListener = Class.forName(p_sClassName);
Constructor oConst = oListener.getConstructor(new Class[] {
- this.getClass(), DomElement.class });
+ this.getClass(), DomElement.class, ActionDefinitionFactory.class });
Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
- p_oP });
+ p_oP, actionDefinitionFactory });
new Thread(oRun).start();
} catch (Exception e) {
m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -1,196 +1,269 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.soa.esb.listeners;
-import java.lang.reflect.*;
-import java.util.Observer;
-import java.util.Observable;
+import java.util.Arrays;
import org.apache.log4j.*;
import javax.naming.*;
import javax.jms.*;
-import org.jboss.soa.esb.actions.AbstractAction;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionHandler;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory.ActionDefinition;
import org.jboss.soa.esb.helpers.*;
-public class JmsQueueListener implements Runnable, Observer
-{
- // You can override these values at constructor time of your derived class
- protected int
- m_iSleepForThreads = 3000 // default sleep if no threads available
- ,m_iUpperThreadLimit = 10 // just in case - override if you wish
- ;
- public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
- public static final String LISTEN_JNDI_TYPE = "listenJndiType";
- public static final String LISTEN_JNDI_URL = "listenJndiURL";
- public static final String LISTEN_QUEUE = "listenQueue";
- public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
+public class JmsQueueListener implements Runnable {
+ // You can override these values at constructor time of your derived class
+ protected int m_iSleepForThreads = 3000 // default sleep if no threads
+ // available
+ , m_iUpperThreadLimit = 10 // just in case - override if you wish
+ ;
- protected boolean m_bError = false;
+ public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
- protected QueueConnection m_oQconn;
- protected QueueSession m_oQsess;
- protected Queue m_oQueue;
- protected String m_sSelector;
- protected MessageConsumer m_oRdr;
+ public static final String LISTEN_JNDI_TYPE = "listenJndiType";
+ public static final String LISTEN_JNDI_URL = "listenJndiURL";
- protected int m_iQthr = 0, m_iMaxThr;
+ public static final String LISTEN_QUEUE = "listenQueue";
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
+ public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
- protected GpListener m_oDad;
- protected DomElement m_oParms;
- protected Class m_oExecClass;
+ protected boolean m_bError = false;
- public JmsQueueListener(GpListener p_oDad, DomElement p_oParms) throws Exception
- {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- checkMyParms();
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
- } //__________________________________
-
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception - if mandatory atts are not right
- * or actionClass not in classpath
- */
- protected void checkMyParms() throws Exception
- {
- String sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_ACTION_CLASS,null);
- m_oExecClass= GpListener.checkActionClass(sAtt);
-
- sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_MAX_THREADS,"1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
+ protected QueueConnection m_oQconn;
- // Third arg is null - Exception will br thrown if listenQueue is not found
- String sQueue = GpListener.obtainAtt(m_oParms,LISTEN_QUEUE,null);
-
- // No problem if selector is null - everything in queue will be returned
- m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+ protected QueueSession m_oQsess;
- m_oQconn = null;
- m_oQsess = null;
- m_oQueue = null;
-
- String sJndiType = GpListener.obtainAtt(m_oParms
- ,LISTEN_JNDI_TYPE,"jboss");
- String sJndiURL = GpListener.obtainAtt(m_oParms
- ,LISTEN_JNDI_URL,"localhost");
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+ protected Queue m_oQueue;
- String sFactClass = GpListener.obtainAtt(m_oParms
- ,LISTEN_QUEUE_CONN_FACT,"ConnectionFactory");
- Object tmp = oJndiCtx.lookup(sFactClass);
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+ protected String m_sSelector;
- m_oQconn = qcf.createQueueConnection();
- m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
- m_oQsess = m_oQconn.createQueueSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oQconn.start();
- m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
-
- } //________________________________
+ protected MessageConsumer m_oRdr;
+ protected int m_iQthr = 0, m_iMaxThr;
+
+ protected ThreadGroup m_oThrGrp = null;
+
+ protected Logger m_oLogger;
+
+ protected GpListener m_oDad;
+
+ protected DomElement m_oParms;
+
+ protected String[] m_oActions;
+
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+ public JmsQueueListener(GpListener p_oDad, DomElement p_oParms,
+ ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ m_oLogger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ m_oParms = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ checkMyParms();
+ m_oThrGrp = new ThreadGroup(m_oParms.getName());
+ } // __________________________________
+
/**
- * Implement run method for this Runnable
- * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
- * no more looping allowed for all child classes
- * <p/> This condition will not prevent child processes to finish normally
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
*/
- public void run()
- {
- while (m_oDad.continueLooping())
- {
- if (m_iQthr >= m_iMaxThr)
- { m_oLogger.info("Waiting for available threads...");
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e) {return; }
- break;
- }
- Message oM = null;
- try { oM = m_oRdr.receive(m_oDad.millisToWait()); }
- catch (JMSException oJ)
- {
- m_oLogger.error("JMS error on receive",oJ);
- for (int i1=0; i1<3; i1++)
- try {checkMyParms(); } // try to reconnect to the queue
- catch (Exception e)
- { m_oLogger.error("Reconnecting to Queue",e);
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e1)
- { //Just return
- return;
- }
- }
- }
- if (null==oM)
- continue;
+ protected void checkMyParms() throws Exception {
+ String sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_ACTIONS, "");
+ m_oActions = sAtt.split(",");
+
+ if(m_oActions.length == 0) {
+ throw new ConfigurationException("Listener 'actions' list must be specified.");
+ }
- AbstractAction oExec = null;
- try
- { Constructor oConst = m_oExecClass
- .getConstructor(GpListener.getActionClassArgs());
- oExec = (AbstractAction)oConst.newInstance
- (new Object[] {m_oParms,oM});
- }
- catch (Exception e)
- { m_oLogger.error("Can't instantiate action class",e);
- break;
- }
- // invoke the run method of the AbstractAction
- m_iQthr += 1;
- oExec.addObserver(this);
- new Thread(oExec).start();
- }
- if (null!=m_oQsess)
- try { m_oQsess.close(); }
- catch (Exception e1) {/* Tried my best - Just continue */}
- if (null!=m_oQconn)
- try { m_oQconn.close(); }
- catch (Exception e2) {/* Tried my best - Just continue */}
- } //______________________________
-
-/**
- * Implementation of Observer interface
- * <p/> Just count the number of active child threads
- *
- */
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- m_iQthr += ((Integer) p_oUsrObj).intValue();
- } //________________________________
+ sAtt = GpListener.obtainAtt(m_oParms, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
-
-} //____________________________________________________________________________
+ // Third arg is null - Exception will br thrown if listenQueue is not
+ // found
+ String sQueue = GpListener.obtainAtt(m_oParms, LISTEN_QUEUE, null);
+
+ // No problem if selector is null - everything in queue will be returned
+ m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+
+ m_oQconn = null;
+ m_oQsess = null;
+ m_oQueue = null;
+
+ String sJndiType = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_TYPE,
+ "jboss");
+ String sJndiURL = GpListener.obtainAtt(m_oParms, LISTEN_JNDI_URL,
+ "localhost");
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+ sJndiURL);
+
+ String sFactClass = GpListener.obtainAtt(m_oParms,
+ LISTEN_QUEUE_CONN_FACT, "ConnectionFactory");
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ m_oQconn = qcf.createQueueConnection();
+ m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+ m_oQsess = m_oQconn.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ m_oQconn.start();
+ m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
+
+ } // ________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ while (m_oDad.continueLooping()) {
+ if (m_iQthr >= m_iMaxThr) {
+ m_oLogger.info("Waiting for available threads...");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+ Message oM = null;
+ try {
+ oM = m_oRdr.receive(m_oDad.millisToWait());
+ } catch (JMSException oJ) {
+ m_oLogger.error("JMS error on receive", oJ);
+ for (int i1 = 0; i1 < 3; i1++)
+ try {
+ checkMyParms();
+ } // try to reconnect to the queue
+ catch (Exception e) {
+ m_oLogger.error("Reconnecting to Queue", e);
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e1) { // Just return
+ return;
+ }
+ }
+ }
+ if (null == oM)
+ continue;
+
+ ActionExecutionRunner runner = new ActionExecutionRunner(oM);
+ new Thread(runner).start();
+ }
+ if (null != m_oQsess)
+ try {
+ m_oQsess.close();
+ } catch (Exception e1) {/* Tried my best - Just continue */
+ }
+ if (null != m_oQconn)
+ try {
+ m_oQconn.close();
+ } catch (Exception e2) {/* Tried my best - Just continue */
+ }
+ } // ______________________________
+
+ /**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
+ * Action Execution Runner.
+ * <p/>
+ * Runs the actions in a listeners "actions" config in a message payload object received
+ * by the listener implementation.
+ * <p/>
+ * TODO: This class is duplicated in both the AbstractPoller and JmsQueueListener classes. Needs to be sorted out as an
+ * overall cleanup of these classes. Lots of duplicate code etc.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ private class ActionExecutionRunner implements Runnable {
+
+ private Object object;
+
+ /**
+ * Private constructor.
+ * @param initialObject The inital processing target object.
+ */
+ private ActionExecutionRunner(Object initialObject) {
+ this.object = initialObject;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ String currentAction = null;
+
+ // Increment the active thread count for the listener on starting...
+ incThreads();
+
+ try {
+ // Run the object through each ActionHandler...
+ for(String action : m_oActions) {
+ ActionDefinition actionDefinition;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+ if(actionDefinition == null) {
+ throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the processing of the next action...
+ ActionHandler handler = actionDefinition.getHandler();
+ object = handler.processAction(object);
+ if(object == null) {
+ m_oLogger.warn("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. ActionHandler [" + handler.getClass().getName() + "] returned a null object result on processing of action [" + currentAction + "].");
+ break;
+ }
+ }
+ } catch(Throwable thrown) {
+ m_oLogger.error("Premature termination of action processing chain [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] thre an exception.", thrown);
+ }
+
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
+ }
+} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-11 18:32:04 UTC (rev 6149)
+++ labs/jbossesb/workspace/tfennelly/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-11 18:32:57 UTC (rev 6150)
@@ -155,9 +155,9 @@
* @param p_oParms DomElement - Sub tree that corresponds to this instance
* @throws Exception
*/
- public SqlTablePoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+ public SqlTablePoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
{
- super(p_oDad,p_oParms);
+ super(p_oDad, p_oParms, actionDefinitionFactory);
try { checkMyParms(); }
catch (Exception e)
{
More information about the jboss-svn-commits
mailing list