[jboss-svn-commits] JBL Code SVN: r6451 - in labs/jbossesb/workspace: . arvinder/product/core/listeners arvinder/product/core/listeners/src/org/jboss/soa/esb arvinder/product/core/listeners/src/org/jboss/soa/esb/actions arvinder/product/core/listeners/src/org/jboss/soa/esb/command arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners arvinder/product/core/rosetta arvinder/product/core/rosetta/src/org/jboss/soa/esb/common arvinder/product/core/rosetta/src/org/jboss/soa/esb/exceptions arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Sep 27 17:16:58 EDT 2006
Author: arvinder
Date: 2006-09-27 17:16:44 -0400 (Wed, 27 Sep 2006)
New Revision: 6451
Added:
labs/jbossesb/workspace/arvinder/
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinition.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessor.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Configuration.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/exceptions/ConfigurationException.java
Removed:
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/SystemProperties.java
Modified:
labs/jbossesb/workspace/arvinder/product/core/listeners/build.xml
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/build.xml
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/Email.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/KeyValuePair.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamFileRepository.java
labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamRepositoryFactory.java
labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/BobjStore.java
labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/DaoSnapTable.java
Log:
This is to merge propertymanager + other changes
Copied: labs/jbossesb/workspace/arvinder (from rev 6446, labs/jbossesb/workspace/rearchitecture)
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/build.xml
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/build.xml 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/build.xml 2006-09-27 21:16:44 UTC (rev 6451)
@@ -15,7 +15,7 @@
</condition>
<path id="org.jboss.esb.listeners.base.classpath">
- <fileset dir="${org.jboss.esb.ext.lib.dir}" includes="activation.jar jbossall-client.jar log4j.jar mail.jar"/>
+ <fileset dir="${org.jboss.esb.ext.lib.dir}" includes="*.jar"/>
</path>
<path id="org.jboss.esb.listeners.default.classpath">
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinition.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinition.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinition.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,106 @@
+package org.jboss.soa.esb.actions;
+
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+/**
+ * Action Definition.
+ * <p/>
+ * An Action is defined by "name", an action "processor" instance to perform the processing action, plus
+ * properties to the processing action to be supplued to the processor instance.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionDefinition {
+
+ private String name;
+ private String processor;
+ private List<KeyValuePair> properties;
+ private boolean isSingleton = true;
+ private ActionProcessor singletonInstance = null;
+
+ /**
+ * Private constructor.
+ * @param name The action name.
+ * @param processor The action processor runtime class.
+ * @param properties Action properties list. An empty list for an action with no defined properties.
+ */
+ protected ActionDefinition(String name, String processor, List<KeyValuePair> properties) {
+ this.name = name;
+ this.processor = processor;
+ this.properties = properties;
+
+ String isSingletonProp = KeyValuePair.getValue("singleton", properties);
+ isSingleton = (isSingletonProp == null || !isSingletonProp.equals("false"));
+ }
+
+ /**
+ * Get the action name.
+ * @return The action name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the action processor instance.
+ * @return The Action Processor.
+ */
+ public ActionProcessor getProcessor() {
+ if(!isSingleton) {
+ return createActionProcessor();
+ } else {
+ if(singletonInstance == null) {
+ singletonInstance = createActionProcessor();
+ }
+ return singletonInstance;
+ }
+ }
+
+ /**
+ * Get the action properties to be supplied to the action processor.
+ * @return The configured action properties. An empty list for an action with no defined properties.
+ */
+ public List<KeyValuePair> getProperties() {
+ return properties;
+ }
+
+ /**
+ * Create the action processor instance.
+ * <p/>
+ * See the {@link ActionProcessor} regarding reflective construction rules.
+ * @return The action processor instance.
+ */
+ private ActionProcessor createActionProcessor() {
+ Class<? extends ActionProcessor> runtimeClass;
+ Class[] NONDEFAULT_SIG = new Class[] {String.class, List.class};
+ Constructor nonDefaultConstructor;
+
+ // Get the runtime class...
+ try {
+ runtimeClass = Class.forName(processor).asSubclass(ActionProcessor.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("Action Processor class " + processor + " not found in classpath.", e);
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Action Processor class " + processor + " does not implement " + ActionProcessor.class.getName(), e);
+ }
+
+ // Construct it...
+ try {
+ nonDefaultConstructor = runtimeClass.getConstructor(NONDEFAULT_SIG);
+ return (ActionProcessor) nonDefaultConstructor.newInstance(new Object[] {name, properties});
+ } catch (NoSuchMethodException e1) {
+ try {
+ return runtimeClass.newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException("No appropriate constructor found on processor class [" + processor + "]. See " + ActionProcessor.class.getName() + " Javadoc.", e);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unexpected exception. Unable to construct processor class instance [" + processor + "] using default constructor.", e);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException("Unexpected exception. Unable to construct processor class instance [" + processor + "] using non-default constructor.", e);
+ }
+ }
+}
\ No newline at end of file
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionDefinitionFactory.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,189 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.exceptions.ConfigurationException;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+/**
+ * Action Definition Factory.
+ * <p/>
+ * Note, much of the logic in this class could easily be replaced through the use of an IoC container.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionDefinitionFactory {
+
+ /**
+ * Class Logger.
+ */
+ private static Logger logger = Logger.getLogger(ActionDefinitionFactory.class);
+ /**
+ * Action Definition table.
+ */
+ private Hashtable<String, ActionDefinition> actionDefinitions = new Hashtable<String, ActionDefinition>();
+
+ /**
+ * Construct a factory instance.
+ * @param config ActionDefinition configuration.
+ * @throws ConfigurationException Invalid configuration.
+ */
+ public ActionDefinitionFactory(DomElement config) throws ConfigurationException {
+ if(config == null || !config.getName().equals("Actions")) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+
+ // Read the processor alias and action def configurations from the XML configuration...
+ DomElement processorAliasConfig = config.getFirstElementChild("ProcessorAliases");
+ DomElement[] actions = config.getElementChildren("Action");
+ if(processorAliasConfig == null) {
+ throw new ConfigurationException("No 'Actions/ProcessorAliases' configuration.");
+ }
+ if(actions == null || actions.length == 0) {
+ throw new ConfigurationException("No 'Actions/Action' configurations.");
+ }
+
+ // Initialise the Action Definition table..
+ Hashtable<String, String> processorClasses = getProcessorClasses(processorAliasConfig);
+ initialiseActionDefinitions(actions, processorClasses);
+ logger.info("ActionDefinition Factory initialisation complete.");
+ }
+
+ /**
+ * Get the action definition for the specified action name.
+ * @param actionName The action name.
+ * @return The action definition.
+ */
+ public ActionDefinition getInstance(String actionName) {
+ if(actionName == null || actionName.trim().equals("")) {
+ throw new IllegalArgumentException("Null or empty 'actionName' arg in method call.");
+ }
+
+ return actionDefinitions.get(actionName);
+ }
+
+ /**
+ * Initialise the Action Definitions.
+ * @param actions Action Definition configurations.
+ * @param processorClasses Processor classes keyed by alias name.
+ * @throws ConfigurationException
+ */
+ private void initialiseActionDefinitions(DomElement[] actions, Hashtable<String, String> processorClasses) throws ConfigurationException {
+ if(processorClasses == null || processorClasses.isEmpty()) {
+ throw new ConfigurationException("No action processor classes defined.");
+ }
+
+ for(DomElement action : actions) {
+ String name = action.getAttr("name");
+ String processorAlias = action.getAttr("processor");
+ String processorClass;
+ List<KeyValuePair> properties;
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action has no 'name' defined.");
+ }
+ if(processorAlias == null || processorAlias.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action [" + name + "] has no 'processor' defined.");
+ }
+ processorClass = processorClasses.get(processorAlias);
+ if(processorClass == null) {
+ throw new ConfigurationException("No action processor class defined for processor alias [" + processorAlias + "] set on action [" + name + "].");
+ }
+
+ // Get any properties defined on the action...
+ properties = getProperties(action);
+
+ // Create the action definition and store it against it's name...
+ actionDefinitions.put(name, new ActionDefinition(name, processorClass, properties));
+ logger.info("Added ActionDefinition [" + name + "] for processor [" + processorAlias + ":" + processorClass + "]. Num properties: " + properties.size());
+ }
+ }
+
+ /**
+ * Get the processor runtime classes.
+ * @param processorAliasConfig Alias configs.
+ * @return Processor runtime classs
+ * @throws ConfigurationException Bad configuration.
+ */
+ private Hashtable<String, String> getProcessorClasses(DomElement processorAliasConfig) throws ConfigurationException {
+ DomElement[] aliases = processorAliasConfig.getElementChildren("Alias");
+ Hashtable<String, String> processorClasses = new Hashtable<String, String>();
+
+ if(aliases == null) {
+ throw new ConfigurationException("No 'Actions/ProcessorAliases/Alias' configurations.");
+ }
+
+ for(DomElement alias : aliases) {
+ String name = alias.getAttr("name");
+ String className = alias.getAttr("class");
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/ProcessorAliases/Alias has no 'name' defined.");
+ }
+ if(className == null || className.trim().equals("")) {
+ throw new ConfigurationException("Actions/ProcessorAliases/Alias [" + name + "] has no 'class' defined.");
+ }
+ processorClasses.put(name, className);
+ logger.info("Added alias [" + name + "] for ActionProcessor class [" + className + "].");
+ }
+
+ return processorClasses;
+ }
+
+ /**
+ * Get the properties defined on the supplied 'Action' configuration.
+ * @param action The action configuration element.
+ * @return A list of property {@link KeyValuePair} instances. An empty list list is returned
+ * if no properties are defined on the action.
+ * @throws ConfigurationException A property name or value is empty or undefined.
+ */
+ private List<KeyValuePair> getProperties(DomElement action) throws ConfigurationException {
+ DomElement[] properties = action.getElementChildren("property");
+ List<KeyValuePair> propertyList = new ArrayList<KeyValuePair>();
+
+ for(DomElement property : properties) {
+ String name = property.getAttr("name");
+ String value = property.getAttr("value");
+
+ // Check the required attributes...
+ if(name == null || name.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action/property has no 'name' defined. Action [" + action.getAttr("name") + "].");
+ }
+ if(value == null || value.trim().equals("")) {
+ throw new ConfigurationException("Actions/Action/property has no 'value' defined. Action [" + action.getAttr("name") + "], Property [" + name + "].");
+ }
+
+ propertyList.add(new KeyValuePair(name, value));
+ }
+
+ return propertyList;
+ }
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessingException.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import org.jboss.soa.esb.exceptions.BaseException;
+
+/**
+ * Exception while processing message payload processing action.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ActionProcessingException extends BaseException {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Public constructor.
+ * @param message Exception message.
+ */
+ public ActionProcessingException(String message) {
+ super(message);
+ }
+
+ /**
+ * Public constructor.
+ * @param message Exception message.
+ * @param cause Exception cause.
+ */
+ public ActionProcessingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Public constructor.
+ * @param cause Exception cause.
+ */
+ public ActionProcessingException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessor.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessor.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/actions/ActionProcessor.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,64 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.actions;
+
+import java.io.Serializable;
+
+/**
+ * Action Processor Interface Definition.
+ * <p/>
+ * An "Action Processor" performs a processing action on a message payload and returns the processing
+ * result.
+ * <p/>
+ * Implementations are constructed based on the following public constructor order precedence:
+ * <ol>
+ * <li><b>(String actionName, List<{@link org.jboss.soa.esb.helpers.KeyValuePair}> properties)</b></li>
+ * <li><b>default constructor</b>.</li>
+ * </ol>
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface ActionProcessor {
+
+ /**
+ * Perform the processing action on the message "payload" and return the result.
+ * @param message The message payload to be processed.
+ * @return The processing result.
+ * @throws ActionProcessingException Exception during payload processing.
+ */
+ public Object process(Object message) throws ActionProcessingException;
+
+ /**
+ * Get the "OK" notification message for this processor.
+ * @param message The message Object.
+ * @return The OK message.
+ */
+ public abstract Serializable getOkNotification(Object message);
+
+ /**
+ * Get the "Error" notification message for this processor.
+ * @param message The message Object.
+ * @return The Error message.
+ */
+ public abstract Serializable getErrorNotification(Object message);
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueue.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,34 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * Command queue abstraction.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public interface CommandQueue {
+
+ /**
+ * Open the command queue.
+ * @param config Command queue configuration.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void open(DomElement config) throws CommandQueueException;
+
+ /**
+ * Receive a message from the underlying queue implementation.
+ * <p/>
+ * Performs a blocking receive on the command queue, controled by the receive timeout.
+ * @param timeout The receive block timeout. Zero to block indefinitely.
+ * @return The command message from the queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public String receiveCommand(long timeout) throws CommandQueueException;
+
+ /**
+ * Close the command queue.
+ * @throws CommandQueueException Queue exception. Check for probable chained cause exceptions.
+ */
+ public void close() throws CommandQueueException;
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/CommandQueueException.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,29 @@
+package org.jboss.soa.esb.command;
+
+import org.jboss.soa.esb.exceptions.BaseException;
+
+/**
+ * Command queue exception.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class CommandQueueException extends BaseException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CommandQueueException() {
+ super();
+ }
+
+ public CommandQueueException(String message) {
+ super(message);
+ }
+
+ public CommandQueueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CommandQueueException(Throwable cause) {
+ super(cause);
+ }
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/InMemoryCommandQueue.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,86 @@
+package org.jboss.soa.esb.command;
+
+import java.util.Hashtable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.soa.esb.helpers.DomElement;
+
+/**
+ * In Memory Blocking Command Queue.
+ * <p/>
+ * Suitable for testing or any other purpose.
+ * <p/>
+ * The command queue's configuration needs to specify the
+ * queue name via a "command-queue-name" attribute supplied in the configuration to the
+ * {@link #open(DomElement)} method. The queues are stored statically and can be accessed via the
+ * {@link #getQueue(String)} method using the queue name.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class InMemoryCommandQueue implements CommandQueue {
+
+ /**
+ * Command queue name attribute name.
+ */
+ public static final String COMMAND_QUEUE_NAME = "command-queue-name";
+
+ private static Hashtable<String, InMemoryCommandQueue> commandQueues = new Hashtable<String, InMemoryCommandQueue>();
+
+ private String name;
+ private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ public void open(DomElement config) throws CommandQueueException {
+ if(config == null) {
+ throw new IllegalArgumentException("null 'config' arg in method call.");
+ }
+
+ name = config.getAttr(COMMAND_QUEUE_NAME);
+ if(name == null) {
+ throw new CommandQueueException("Attribute 'command-queue-name' must be specified on the command queue configuration.");
+ }
+ commandQueues.put(name, this);
+ }
+
+ /**
+ * Add a command to the in-memory command queue.
+ * <p/>
+ * Blocks until the command has been consumed.
+ * @param command The command string.
+ */
+ public void addCommand(String command) {
+ queue.add(command);
+ while(!queue.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ if(name == null || !commandQueues.containsKey(name)) {
+ throw new CommandQueueException("Sorry. Invalid call to 'receiveCommand' method. Queue is not open!");
+ }
+
+ try {
+ return queue.take();
+ } catch (InterruptedException e) {
+ throw new CommandQueueException("Error taking command message from command queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ commandQueues.remove(name);
+ }
+
+ /**
+ * Get the command queue based on the name supplied in the configuration ("command-queue-name").
+ * @param name The name of the queue ala the "command-queue-name" attribute on the queue configuration.
+ * @return The MockCommandQueue instance, or null if no such queue exists.
+ */
+ public static InMemoryCommandQueue getQueue(String name) {
+ return commandQueues.get(name);
+ }
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/command/JmsCommandQueue.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,149 @@
+package org.jboss.soa.esb.command;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicSession;
+import javax.naming.Context;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.listeners.GpListener;
+import org.jboss.soa.esb.util.Util;
+
+/**
+ * JMS based Command Queue implementation.
+ * <p/>
+ * This code was simply pulled from the GpListener.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class JmsCommandQueue implements CommandQueue {
+
+ private static Logger logger = Logger.getLogger(JmsCommandQueue.class);
+
+ public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
+
+ public static final String COMMAND_JNDI_TYPE = "commandJndiType";
+
+ public static final String COMMAND_JNDI_URL = "commandJndiURL";
+
+ public static final String COMMAND_IS_TOPIC = "commandIsTopic";
+
+ public static final String COMMAND_JNDI_NAME = "commandJndiName";
+
+ public static final String COMMAND_MSG_SELECTOR = "messageSelector";
+
+ private MessageConsumer m_oCmdSrc;
+
+ private Session m_oJmsSess;
+
+ private Connection m_oJmsConn;
+
+ public void open(DomElement config) throws CommandQueueException {
+ try {
+ initialiseJMS(config);
+ } catch (Exception e) {
+ throw new CommandQueueException("Failed to initialise JMS Command Queue.", e);
+ }
+ }
+
+ public void close() throws CommandQueueException {
+ if (null != m_oJmsSess) {
+ try {
+ m_oJmsSess.close();
+ } catch (JMSException eS) {/* Tried my best - Just continue */
+ }
+ }
+ if (null != m_oJmsConn) {
+ try {
+ m_oJmsConn.close();
+ } catch (JMSException eC) {/* Tried my best - Just continue */
+ }
+ }
+ }
+
+ public String receiveCommand(long timeout) throws CommandQueueException {
+ try {
+ Message jmsMessage = m_oCmdSrc.receive(timeout);
+
+ if (null == jmsMessage)
+ return null;
+ if (jmsMessage instanceof TextMessage) {
+ return ((TextMessage)jmsMessage).getText();
+ } else {
+ logger.warn("Message in command queue IGNORED - should be instanceof TextMessage");
+ }
+ } catch(Exception e) {
+ throw new CommandQueueException("Exception receiving message from JMS Command Queue.", e);
+ }
+
+ return null;
+ }
+
+ private void initialiseJMS(DomElement p_oP) throws Exception {
+ // Only check for JMS attributes if a queue JNDI name was specified
+ String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
+ if (!Util.isNullString(sJndiName)) {
+ Map<String, Object> oNewAtts = new HashMap<String, Object>();
+
+ oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
+
+ String sJndiType = GpListener.obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
+ oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
+ String sJndiURL = GpListener.obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
+ oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+ sJndiURL);
+
+ String sFactClass = GpListener.obtainAtt(p_oP, COMMAND_CONN_FACTORY,
+ "ConnectionFactory");
+ oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
+ if (Util.isNullString(sFactClass))
+ sFactClass = "ConnectionFactory";
+ Object oFactCls = oJndiCtx.lookup(sFactClass);
+
+ String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
+ if (null != sMsgSelector)
+ oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
+
+ boolean bIsTopic = Boolean.parseBoolean(GpListener.obtainAtt(p_oP,
+ COMMAND_IS_TOPIC, "false"));
+ if (bIsTopic) {
+ TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
+ TopicConnection oTC = tcf.createTopicConnection();
+ Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
+ TopicSession oSess = oTC.createTopicSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ m_oJmsConn = oTC;
+ m_oJmsSess = oSess;
+ oTC.start();
+ m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
+ } else {
+ QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
+ QueueConnection oQC = qcf.createQueueConnection();
+ javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
+ .lookup(sJndiName);
+ QueueSession oSess = oQC.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ oQC.start();
+ m_oJmsConn = oQC;
+ m_oJmsSess = oSess;
+ m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
+ }
+ }
+ }
+}
Added: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractListener.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,229 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners;
+
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.actions.ActionDefinition;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.exceptions.ConfigurationException;
+import org.jboss.soa.esb.helpers.DomElement;
+
+import java.util.Arrays;
+
+/**
+ * Base abstract message listener implementation.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public abstract class AbstractListener implements Runnable {
+
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
+ protected int m_iSleepForThreads = 3000; // default sleep if no threads available
+ protected int m_iUpperThreadLimit = 10; // just in case - override if you wish
+
+ protected int m_iQthr = 0, m_iMaxThr;
+
+ protected ThreadGroup m_oThrGrp = null;
+
+ protected Logger logger;
+
+ protected GpListener m_oDad;
+
+ protected DomElement listenerConfig;
+
+ protected String[] m_oActions;
+
+ protected ActionDefinitionFactory m_oActionDefinitionFactory;
+
+ protected AbstractListener(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+
+ logger = Logger.getLogger(this.getClass());
+ m_oDad = p_oDad;
+ listenerConfig = p_oParms.cloneObj();
+ m_oActionDefinitionFactory = actionDefinitionFactory;
+ m_oThrGrp = new ThreadGroup(listenerConfig.getName());
+
+ String sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_ACTIONS, "");
+ m_oActions = sAtt.split(",");
+
+ if(m_oActions.length == 0) {
+ throw new ConfigurationException("Listener 'actions' list must be specified.");
+ }
+
+ sAtt = GpListener.obtainAtt(listenerConfig, GpListener.PARM_MAX_THREADS, "1");
+ int iMax = Integer.parseInt(sAtt);
+ m_iMaxThr = Math.min(iMax, m_iUpperThreadLimit);
+ } // __________________________________
+
+ /**
+ * Implement run method for this Runnable <p/> Will continue to run until
+ * controlling class (ref in m_oDad) indicates no more looping allowed for
+ * all child classes <p/> This condition will not prevent child processes to
+ * finish normally
+ */
+ public void run() {
+ while (m_oDad.continueLooping()) {
+ Object[] processList = receive();
+
+ for (Object message : processList) {
+ if (m_iQthr >= m_iMaxThr) {
+ logger.info("Waiting for available threads...(max=" + m_iMaxThr + ")");
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e) {
+ return;
+ }
+ break;
+ }
+
+ // Spawn a thread and push the message message through the pipeline...
+ ActionProcessingPipeline runner = new ActionProcessingPipeline(message);
+ new Thread(runner).start();
+ }
+ }
+
+ close();
+ }
+
+ /**
+ * Receive message from underlying channel implementation.
+ * <p/>
+ * Implementations must perform a blocking receive.
+ * @return An array of Objects received on the channel.
+ */
+ protected abstract Object[] receive();
+
+ /**
+ * Called on the listener implementation when pipeline processing error has occured.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ * @param processor The processor raised the error.
+ * @param error The error.
+ */
+ protected abstract void processingError(Object initialMessage, ActionProcessor processor, Throwable error);
+
+ /**
+ * Called on the listener implementation when pipeline processing of a message is complete.
+ * @param initialMessage The message reference that was initialy supplied to the pipeline.
+ */
+ protected abstract void processingComplete(Object initialMessage);
+
+ /**
+ * Close the listener implemenation.
+ * <p/>
+ * Allows the listener to perform relevant close/cleanup tasks.
+ */
+ protected abstract void close();
+
+ /**
+ * Increment the active thread count.
+ */
+ private void incThreads() {
+ m_iQthr++;
+ }
+
+ /**
+ * Decrement the active thread count.
+ */
+ private void decThreads() {
+ m_iQthr--;
+ }
+
+ /**
+ * Action Processing Pipeline.
+ * <p/>
+ * Runs the actions in a listeners "actions" config on a message payload message received
+ * by the listener implementation.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ private class ActionProcessingPipeline implements Runnable {
+
+ private Object initialMessage;
+
+ /**
+ * Private constructor.
+ * @param initialMessage The inital processing target message.
+ */
+ private ActionProcessingPipeline(Object initialMessage) {
+ this.initialMessage = initialMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ String currentAction = null;
+ ActionProcessor currentProcessor = null;
+
+ // Increment the active thread count for the listener on starting...
+ incThreads();
+
+ try {
+ Object message = initialMessage;
+
+ // Run the message through each ActionProcessor...
+ for(String action : m_oActions) {
+ ActionDefinition actionDefinition;
+ Object processingResult = null;
+
+ currentAction = action.trim();
+ actionDefinition = m_oActionDefinitionFactory.getInstance(currentAction);
+ if(actionDefinition == null) {
+ throw new java.lang.IllegalStateException("Bad Listener Configuration. No 'Actions/Action' definition for action [" + currentAction + "].");
+ }
+
+ // The processing result of each action feeds into the processing of the next action...
+ currentProcessor = actionDefinition.getProcessor();
+ try {
+ processingResult = currentProcessor.process(message);
+ } catch (Exception e) {
+ GpListener.notifyError(listenerConfig, e, currentProcessor.getErrorNotification(message));
+ throw e;
+ }
+
+ if(message == null && action != m_oActions[m_oActions.length - 1]) {
+ logger.warn("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. ActionProcessor [" + currentProcessor.getClass().getName() + "] returned a null message result on processing of action [" + currentAction + "].");
+ break;
+ }
+ // Notify on all processors. May want to do this differently in the future i.e. more selectively ...
+ GpListener.notifyOK(listenerConfig, currentProcessor.getOkNotification(message));
+
+ // Setup the message for processing by the next processor...
+ message = processingResult;
+ }
+ } catch(Throwable thrown) {
+ processingError(initialMessage, currentProcessor, thrown);
+ logger.error("Premature termination of action processing pipeline [" + Arrays.asList(m_oActions) + "]. Action [" + currentAction + "] threw an exception.", thrown);
+ }
+
+ processingComplete(initialMessage);
+
+ // Decrement the active thread count for the listener on completion...
+ decThreads();
+ }
+ }
+
+} // ____________________________________________________________________________
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/AbstractPoller.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -1,161 +1,111 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
-
package org.jboss.soa.esb.listeners;
-import java.util.*;
-import java.lang.reflect.Constructor;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.helpers.DomElement;
-import org.apache.log4j.*;
+import java.util.List;
-import org.jboss.soa.esb.actions.AbstractAction;
-import org.jboss.soa.esb.helpers.*;
+/**
+ * Abstract Polling Listener.
+ * <p/>
+ * Polling listeners are listener implementations that periodically poll for message objects
+ * that require processing. This type of listener implementation is required where the underlying
+ * message channel doesn't support a blocking receive operation.
+ *
+ * @author Esteban
+ */
+public abstract class AbstractPoller extends AbstractListener {
-public abstract class AbstractPoller implements Runnable, Observer
-{
- protected abstract List<Object> pollForCandidates();
- protected abstract Object preProcess (Object p_o) throws Exception;
-
- // You can override these values at constructor time of your
- // derived class after calling super(GpListener,DomElement)
- protected int m_iMinPollMillis = 3000 // minimum polling interval
- ,m_iDfltPollMillis = 20000 // default polling interval
- ,m_iSleepForThreads = 3000 // default sleep if no threads available
- ,m_iUpperThreadLimit = 10 // just in case - override if you wish
- ;
-
- public static final String PARM_POLL_LTCY = "pollLatencySecs";
+ // You can override these values at constructor time of your
+ // derived class after calling super(GpListener,DomElement)
+ protected int m_iMinPollMillis = 3000 // minimum polling interval
+ , m_iDfltPollMillis = 20000 // default polling interval
+ ;
- protected int m_iQthr = 0, m_iMaxThr;
- protected int m_iPollMillis;
+ public static final String PARM_POLL_LTCY = "pollLatencySecs";
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
+ protected int m_iPollMillis;
- protected GpListener m_oDad;
- protected DomElement m_oParms;
- protected Class m_oExecClass;
+ /**
+ * Construct an abstract polling listener.
+ * @param commandListener The command listener.
+ * @param listenerConfig The configuration for this polling listener.
+ * @param actionDefinitionFactory The action definition factory for the bus.
+ * @throws Exception
+ */
+ protected AbstractPoller(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(commandListener, listenerConfig, actionDefinitionFactory);
- protected AbstractPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
- {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- checkParms();
- } //__________________________________
-/**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception - if actionClass not specified or not in classpath
- * or invalid int values for maxThreads or pollLatencySecs
- */
- protected void checkParms() throws Exception
- {
- String sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_ACTION_CLASS,null);
- m_oExecClass = GpListener.checkActionClass(sAtt);
-
- sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_MAX_THREADS,"1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
+ String sAtt = listenerConfig.getAttr(PARM_POLL_LTCY);
+ m_iPollMillis = (null == sAtt) ? m_iDfltPollMillis : 1000 * Integer.parseInt(sAtt);
+ if (m_iPollMillis < m_iMinPollMillis) {
+ m_iPollMillis = m_iMinPollMillis;
+ }
+ }
- sAtt = m_oParms.getAttr(PARM_POLL_LTCY);
- m_iPollMillis = (null==sAtt) ? m_iDfltPollMillis
- : 1000 * Integer.parseInt(sAtt);
- if (m_iPollMillis < m_iMinPollMillis)
- m_iPollMillis = m_iMinPollMillis;
- } //________________________________
+ /**
+ * Polling listener receive implementation.
+ * @return An array of objects polled from the concrete Poller implementation.
+ */
+ protected Object[] receive() {
+ while (m_oDad.continueLooping()) {
+ List<Object> olPending = pollForCandidates();
- /**
- * Implementation of Observer interface
- * <p/> Just count the number of active child threads
- *
- */
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- m_iQthr += ((Integer) p_oUsrObj).intValue();
- } //________________________________
+ if (olPending == null || olPending.isEmpty()) {
+ try {
+ Thread.sleep(m_iPollMillis);
+ } catch (InterruptedException e) {
+ logger.error("Unexpected thread interupt exception. Not terminating blocking receive!!", e);
+ }
+ continue;
+ } else {
+ Object[] objForProcessing = new Object[olPending.size()];
- /**
- * Implement run method for this Runnable
- * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
- * no more looping allowed for all child classes
- * <p/> This condition will not prevent child processes to finish normally
- */
- public void run()
- {
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
- while (m_oDad.continueLooping())
- {
- List <Object> olPending = pollForCandidates();
- if (olPending.size() < 1)
- { try { Thread.sleep(m_iPollMillis); }
- catch (InterruptedException e) { return; }
- continue;
- }
+ // Preprocess all the message objects.
+ // TODO: I really think this is no longer required or a good idea!!
+ for(int i = 0; i < olPending.size(); i++) {
+ objForProcessing[i] = preProcess(olPending.get(i));
+ }
+ return objForProcessing;
+ }
+ }
- for (Object oCurr : olPending)
- {
- if (m_iQthr >= m_iMaxThr)
- { m_oLogger.info("Waiting for available threads...(max="
- +m_iMaxThr+")");
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e) {return; }
- break;
- }
+ return null;
+ }
- // give the derived class an opportunity to do something
- // before processing current object
- Object oProcess = null;
- try
- { if (null==(oProcess = preProcess(oCurr)))
- continue;
- }
- catch (Exception ePre)
- { m_oLogger.error("preProcess(Object) FAILED",ePre);
- continue;
- }
+ /**
+ * Poll for message objects.
+ * @return A list of message objects, or an empty list if there are no message objects.
+ */
+ protected abstract List<Object> pollForCandidates();
- AbstractAction oExec = null;
- try
- { Constructor oConst = m_oExecClass
- .getConstructor(GpListener.getActionClassArgs());
- oExec = (AbstractAction)oConst
- .newInstance(new Object[] {m_oParms,oProcess});
- }
- catch (Exception e)
- { m_oLogger.error("Can't instantiate action class",e);
- break;
- }
- // launch an instance of the AbstractAction in a child thread
- m_iQthr += 1;
- oExec.addObserver(this);
- new Thread(oExec).start();
- }
- }
- } //__________________________________
-
-} //____________________________________________________________________________
+ /**
+ * Preprocess the message object before returning for pipeline processing.
+ * @param message Message object for preprocessing.
+ * @return The preprocessed message object, or the supplied message unmodified.
+ */
+ protected abstract Object preProcess(Object message);
+ // TODO: Is this "preprocessing" step needed now that we have processing pipelines on listeners???
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/DirectoryPoller.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -23,13 +23,16 @@
package org.jboss.soa.esb.listeners;
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.util.Util;
-import org.jboss.soa.esb.util.*;
-import org.jboss.soa.esb.actions.AbstractFileAction;
-import org.jboss.soa.esb.helpers.*;
+import java.io.File;
+import java.io.FileFilter;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
public class DirectoryPoller extends AbstractPoller
{
@@ -42,9 +45,9 @@
public static final String FILE_POST_SFX = "postSuffix";
public static final String FILE_POST_DEL = "postDelete";
- public DirectoryPoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+ public DirectoryPoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
{
- super(p_oDad,p_oParms);
+ super(p_oDad, p_oParms, actionDefinitionFactory);
checkMyParms();
} //__________________________________
@@ -56,31 +59,32 @@
/**
*
- * @param p_o Object - Must be a File representing the file that has to be processed
+ * @param inputObject Object - Must be a File representing the file that has to be processed
* @return Object - an array of 3 Files containing:
* <p/>[0] renamed file (workSuffix appended to input file name)
* <p/>[1] target file name in case actionClass is unable to complete successfuly
* <p/>[2] target file name in case actionClass finishes successfuly
*/
@Override
- public Object preProcess(Object p_o) throws Exception
+ public Object preProcess(Object inputObject)
{
- if (!(p_o instanceof File))
+ if (!(inputObject instanceof File)) {
return null;
- File oF = (File)p_o;
- File oNameWrk = new File (oF.getParentFile(),oF.getName()+m_sWrkSfx);
+ }
+
+ File inputFile = (File)inputObject;
+ WorkingFile workingFile = new WorkingFile(inputFile.getParentFile(), inputFile.getName() + m_sWrkSfx);
-
- if (! oF.renameTo(oNameWrk))
+ if (!inputFile.renameTo(workingFile)) {
return null;
- AbstractFileAction.Params oCurr = new AbstractFileAction.Params();
- oCurr.bPostDelete = m_bPostDel;
- oCurr.oInpF = oF;
- oCurr.oWrkF = oNameWrk;
- oCurr.oErrF = new File (m_oErrorDir ,oF.getName()+m_sErrSfx);
- oCurr.oDoneF = new File (m_oPostDir ,oF.getName()+m_sPostSfx);
+ }
+
+ workingFile.postDelete = m_bPostDel;
+ workingFile.inputFile = inputFile;
+ workingFile.errorFile = new File (m_oErrorDir ,inputFile.getName()+m_sErrSfx);
+ workingFile.outputFile = new File (m_oPostDir ,inputFile.getName()+m_sPostSfx);
- return oCurr;
+ return workingFile;
} //________________________________
@Override
@@ -90,32 +94,32 @@
return Arrays.asList((Object[])oaF);
} //________________________________
- protected void checkMyParms() throws Exception
+ private void checkMyParms() throws Exception
{
// INPUT directory and suffix (used for FileFilter)
- String sInpDir = GpListener.obtainAtt(m_oParms,FILE_INPUT_DIR,null);
- m_oInpDir = new File(new URI(sInpDir));
+ String sInpDir = GpListener.obtainAtt(listenerConfig,FILE_INPUT_DIR,null);
+ m_oInpDir = getFile(sInpDir);
seeIfOkToWorkOnDir(m_oInpDir);
- m_sInpSfx = GpListener.obtainAtt(m_oParms,FILE_INPUT_SFX,null);
+ m_sInpSfx = GpListener.obtainAtt(listenerConfig,FILE_INPUT_SFX,null);
m_sInpSfx = m_sInpSfx.trim();
if (m_sInpSfx.length()<1)
throw new Exception ("Invalid "+FILE_INPUT_SFX+" attribute");
m_oFFilt = new FileEndsWith(m_sInpSfx);
// WORK suffix (will rename in input directory)
- m_sWrkSfx = GpListener.obtainAtt(m_oParms,FILE_WORK_SFX,".esbWork").trim();
+ m_sWrkSfx = GpListener.obtainAtt(listenerConfig,FILE_WORK_SFX,".esbWork").trim();
if (m_sWrkSfx.length()<1)
throw new Exception ("Invalid "+FILE_WORK_SFX+" attribute");
if (m_sInpSfx.equals(m_sWrkSfx))
throw new Exception("Work suffix must differ from input suffix <"+m_sWrkSfx+">");
// ERROR directory and suffix (defaults to input dir and ".esbError" suffix)
- String sErrDir = GpListener.obtainAtt(m_oParms,FILE_ERROR_DIR,sInpDir);
- m_oErrorDir = new File(new URI(sErrDir));
+ String sErrDir = GpListener.obtainAtt(listenerConfig,FILE_ERROR_DIR,sInpDir);
+ m_oErrorDir = getFile(sErrDir);
seeIfOkToWorkOnDir(m_oErrorDir);
- m_sErrSfx = GpListener.obtainAtt(m_oParms,FILE_ERROR_SFX,".esbError").trim();
+ m_sErrSfx = GpListener.obtainAtt(listenerConfig,FILE_ERROR_SFX,".esbError").trim();
if (m_sErrSfx.length()<1)
throw new Exception ("Invalid "+FILE_ERROR_SFX+" attribute");
if (m_oErrorDir.equals(m_oInpDir) && m_sInpSfx.equals(m_sErrSfx))
@@ -123,16 +127,16 @@
// Do users wish to delete files that were processed OK ?
- String sPostDel = GpListener.obtainAtt(m_oParms,FILE_POST_DEL,"false").trim();
+ String sPostDel = GpListener.obtainAtt(listenerConfig,FILE_POST_DEL,"false").trim();
m_bPostDel = Boolean.parseBoolean(sPostDel);
if (m_bPostDel)
return;
// POST (done) directory and suffix (defaults to input dir and ".esbDone" suffix)
- String sPostDir = GpListener.obtainAtt(m_oParms,FILE_POST_DIR,sInpDir);
- m_oPostDir = new File(new URI(sPostDir));
+ String sPostDir = GpListener.obtainAtt(listenerConfig,FILE_POST_DIR,sInpDir);
+ m_oPostDir = getFile(sPostDir);
seeIfOkToWorkOnDir(m_oPostDir);
- m_sPostSfx = GpListener.obtainAtt(m_oParms,FILE_POST_SFX,".esbDone").trim();
+ m_sPostSfx = GpListener.obtainAtt(listenerConfig,FILE_POST_SFX,".esbDone").trim();
if (m_oPostDir.equals(m_oInpDir))
{ if (m_sPostSfx.length()<1)
throw new Exception ("Invalid "+FILE_POST_SFX+" attribute");
@@ -142,7 +146,15 @@
} //________________________________
- protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
+ private File getFile(String file) {
+ try {
+ return new File(new URI(file));
+ } catch(Exception e) {
+ return new File(file);
+ }
+ }
+
+ protected void seeIfOkToWorkOnDir (File p_oDir) throws Exception
{
if (! p_oDir.exists())
throw new Exception ("Directory "+p_oDir.toString()+" not found");
@@ -172,4 +184,77 @@
} //______________________________
} //____________________________________________________
-} //____________________________________________________________________________
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ WorkingFile workingFile = (WorkingFile) initialMessage;
+
+ workingFile.renameToError();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ WorkingFile workingFile = (WorkingFile) initialMessage;
+
+ // Delete or rename the file...
+ if (workingFile.postDelete) {
+ workingFile.delete();
+ } else {
+ workingFile.renameToOutputFile();
+ }
+ }
+
+ /**
+ * Working file.
+ * <p/>
+ * Once the directory poller picks up on an input file, it immediately renames it to a working file
+ * in order to avoid a situation where the file gets processed again.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+ public static class WorkingFile extends File {
+ private static final long serialVersionUID = 1L;
+
+ private boolean postDelete;
+
+ private File inputFile, errorFile, outputFile;
+
+ public WorkingFile(String filename) {
+ super(filename);
+ }
+
+ public WorkingFile(File parentFile, String filename) {
+ super(parentFile, filename);
+ }
+
+ private boolean renameToError() {
+ return renameTo(errorFile);
+ }
+
+ private boolean renameToOutputFile() {
+ return renameTo(outputFile);
+ }
+
+ /**
+ * Get the File instance representing the original input file.
+ * @return Original input file.
+ */
+ public File getInputFile() {
+ return inputFile;
+ }
+ }
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/GpListener.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -22,22 +22,30 @@
package org.jboss.soa.esb.listeners;
-import java.io.*;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
import java.text.SimpleDateFormat;
-import java.util.*;
-import java.lang.reflect.*;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
-import javax.jms.*;
-import javax.naming.*;
-
-import org.apache.log4j.*;
-
-import org.jboss.soa.esb.actions.AbstractAction;
-import org.jboss.soa.esb.common.SystemProperties;
-import org.jboss.soa.esb.helpers.*;
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.exceptions.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.command.CommandQueue;
+import org.jboss.soa.esb.command.CommandQueueException;
+import org.jboss.soa.esb.command.JmsCommandQueue;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.helpers.DomElement;
import org.jboss.soa.esb.notification.NotificationList;
-import org.jboss.soa.esb.parameters.*;
-import org.jboss.soa.esb.services.*;
+import org.jboss.soa.esb.parameters.ParamRepositoryException;
+import org.jboss.soa.esb.parameters.ParamRepositoryFactory;
+import org.jboss.soa.esb.services.InotificationHandler;
+import org.jboss.soa.esb.services.NotificationHandlerFactory;
import org.jboss.soa.esb.util.Util;
import org.xml.sax.SAXException;
@@ -83,18 +91,6 @@
// parameter reloads
;
- public static final String COMMAND_CONN_FACTORY = "commandConnFactoryClass";
-
- public static final String COMMAND_JNDI_TYPE = "commandJndiType";
-
- public static final String COMMAND_JNDI_URL = "commandJndiURL";
-
- public static final String COMMAND_IS_TOPIC = "commandIsTopic";
-
- public static final String COMMAND_JNDI_NAME = "commandJndiName";
-
- public static final String COMMAND_MSG_SELECTOR = "messageSelector";
-
public static final String PARM_RELOAD_SECS = "parameterReloadSecs";
public static final String PARM_END_TIME = "endTime";
@@ -105,7 +101,7 @@
// DomElements
public static final String PARM_LISTENER_CLASS = "listenerClass";
- public static final String PARM_ACTION_CLASS = "actionClass";
+ public static final String PARM_ACTIONS = "actions";
public static final String PARM_MAX_THREADS = "maxThreads";
@@ -164,12 +160,10 @@
}
};
- private MessageConsumer m_oCmdSrc;
+ private CommandQueue commandQueue;
- private Session m_oJmsSess;
+ private ActionDefinitionFactory actionDefinitionFactory;
- private Connection m_oJmsConn;
-
/**
* Construct a Listener Manager from the named repository based
* configuration.
@@ -204,9 +198,7 @@
m_oState = State.Exception_thrown;
m_oState.m_oException = e;
- m_oLogger
- .fatal(
- "Listener configuration and startup error. Config Source: "
+ m_oLogger.fatal("Listener configuration and startup error. Config Source: "
+ (configSource != null ? configSource
: "unknown"), e);
@@ -253,66 +245,18 @@
public void checkParms(DomElement p_oP) throws Exception {
// We've just loaded - set to false until next reload requested
m_bReloadRequested = false;
- m_oCmdSrc = null;
+ commandQueue = createCommandQueue(p_oP);
- Map<String, Object> oNewAtts = new HashMap<String, Object>();
+ // Open the command queue...
+ commandQueue.open(p_oP);
- // Only check for JMS attributes if a queue JNDI name was specified
- String sJndiName = p_oP.getAttr(COMMAND_JNDI_NAME);
- if (!Util.isNullString(sJndiName)) {
- oNewAtts.put(COMMAND_JNDI_NAME, sJndiName);
-
- String sJndiType = obtainAtt(p_oP, COMMAND_JNDI_TYPE, "jboss");
- oNewAtts.put(COMMAND_JNDI_TYPE, sJndiType);
- String sJndiURL = obtainAtt(p_oP, COMMAND_JNDI_URL, "localhost");
- oNewAtts.put(COMMAND_JNDI_URL, sJndiURL);
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
- sJndiURL);
-
- String sFactClass = obtainAtt(p_oP, COMMAND_CONN_FACTORY,
- "ConnectionFactory");
- oNewAtts.put(COMMAND_CONN_FACTORY, sFactClass);
- if (Util.isNullString(sFactClass))
- sFactClass = "ConnectionFactory";
- Object oFactCls = oJndiCtx.lookup(sFactClass);
-
- String sMsgSelector = p_oP.getAttr(COMMAND_MSG_SELECTOR);
- if (null != sMsgSelector)
- oNewAtts.put(COMMAND_MSG_SELECTOR, sMsgSelector);
-
- boolean bIsTopic = Boolean.parseBoolean(obtainAtt(p_oP,
- COMMAND_IS_TOPIC, "false"));
- if (bIsTopic) {
- TopicConnectionFactory tcf = (TopicConnectionFactory) oFactCls;
- TopicConnection oTC = tcf.createTopicConnection();
- Topic oTopic = (Topic) oJndiCtx.lookup(sJndiName);
- TopicSession oSess = oTC.createTopicSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- m_oJmsConn = oTC;
- m_oJmsSess = oSess;
- oTC.start();
- m_oCmdSrc = oSess.createSubscriber(oTopic, sMsgSelector, true);
- } else {
- QueueConnectionFactory qcf = (QueueConnectionFactory) oFactCls;
- QueueConnection oQC = qcf.createQueueConnection();
- javax.jms.Queue oQ = (javax.jms.Queue) oJndiCtx
- .lookup(sJndiName);
- QueueSession oSess = oQC.createQueueSession(false,
- TopicSession.AUTO_ACKNOWLEDGE);
- oQC.start();
- m_oJmsConn = oQC;
- m_oJmsSess = oSess;
- m_oCmdSrc = oSess.createReceiver(oQ, sMsgSelector);
- }
- }
-
// if PARM_RELOAD_SECS not set, and no command queue
// then reload every 10 minutes
// If there is a command queue, run until command is received
String sRldSecs = p_oP.getAttr(PARM_RELOAD_SECS);
m_lNextReload = (null != sRldSecs) ? System.currentTimeMillis() + 1000
* Long.parseLong(sRldSecs)
- : (null == m_oCmdSrc) ? Long.MAX_VALUE : System
+ : (null == commandQueue) ? Long.MAX_VALUE : System
.currentTimeMillis()
+ m_iDfltReloadMillis;
@@ -323,8 +267,35 @@
m_lEndTime = (null == sEndT) ? Long.MAX_VALUE : s_oDateParse.parse(
sEndT).getTime();
+ // Read and initialise the action definitions...
+ DomElement actionConfig = p_oP.getFirstElementChild("Actions");
+ if(actionConfig == null) {
+ throw new ConfigurationException("No 'Actions' configuration.");
+ }
+ actionDefinitionFactory = new ActionDefinitionFactory(actionConfig);
+
} // ________________________________
+ /**
+ * Factory method for creating the command queue.
+ * @param config GpListener config.
+ * @return GpListener CommandQueue instance.
+ */
+ private CommandQueue createCommandQueue(DomElement config) {
+ String commandQueueClass = config.getAttr("command-queue-class");
+
+ if(commandQueueClass != null) {
+ try {
+ return (CommandQueue) Class.forName(commandQueueClass).newInstance();
+ } catch (Exception e) {
+ m_oLogger.error("Failed to instantiate CommandQueue ["+ commandQueueClass + "]. Defaulting to the JMS Command Queue", e);
+ }
+ }
+
+ // Default command queue...
+ return new JmsCommandQueue();
+ }
+
/**
* Main execution loop <p/> Will continue to run until either <p/>a) run
* time is expired <p/>b) quiesce command is received in command queue
@@ -373,25 +344,21 @@
m_oLogger
.info("Finishing_____________________________________________________");
- if (null != m_oJmsSess)
- try {
- m_oJmsSess.close();
- } catch (JMSException eS) {/* Tried my best - Just continue */
- }
- if (null != m_oJmsConn)
- try {
- m_oJmsConn.close();
- } catch (JMSException eC) {/* Tried my best - Just continue */
- }
+ // Close the command queue...
+ try {
+ commandQueue.close();
+ } catch (CommandQueueException e) {
+ m_oLogger.error("Error closing Command Queue.", e);
+ }
} // ________________________________
private void tryToLaunchChildListener(DomElement p_oP, String p_sClassName) {
try {
Class oListener = Class.forName(p_sClassName);
Constructor oConst = oListener.getConstructor(new Class[] {
- this.getClass(), DomElement.class });
+ this.getClass(), DomElement.class, ActionDefinitionFactory.class });
Runnable oRun = (Runnable) oConst.newInstance(new Object[] { this,
- p_oP });
+ p_oP, actionDefinitionFactory });
new Thread(oRun).start();
} catch (Exception e) {
m_oLogger.error("Cannot launch <" + p_sClassName + ">\n", e);
@@ -405,12 +372,13 @@
private void waitForCmdOrSleep() {
long lToGo = millisToWait();
- if (null == m_oCmdSrc) {
+ if (null == commandQueue) {
m_oLogger.debug("About to sleep " + lToGo);
// No command queue nor topic - Just sleep until time
// exhausted, or thread interrupted
try {
- Thread.sleep(lToGo);
+ if (lToGo > 0)
+ Thread.sleep(lToGo);
} catch (InterruptedException e) {
m_lEndTime = 0; // mark as end requested and return
}
@@ -422,22 +390,17 @@
// that's why time to go is recalculated on each cycle
while ((lToGo = millisToWait()) > 0) {
try {
- m_oLogger.info("Waiting for command ... timeout=" + lToGo
- + " millis");
- // for the time being, only text messages allowed
- // THIS WILL CHANGE !!
- Message oM = m_oCmdSrc.receive(lToGo);
- if (null == oM)
+ m_oLogger.info("Waiting for command ... timeout=" + lToGo + " millis");
+
+ String oM = commandQueue.receiveCommand(lToGo);
+ if (null == oM) {
return;
- if (!(oM instanceof TextMessage)) {
- m_oLogger
- .warn("Message in command queue IGNORED - should be instanceof TextMessage");
- return;
}
- processCommand((TextMessage) oM);
- if (endRequested() || timeToReload())
+ processCommand(oM);
+ if (endRequested() || timeToReload()) {
break;
- } catch (JMSException eJ) {
+ }
+ } catch (CommandQueueException eJ) {
m_oLogger.info("receive on command queue failed", eJ);
}
}
@@ -472,40 +435,37 @@
* </TABLE> * startsWith() <p/>
*
* @param p_oMsg
- * TextMessage - Received in command queue/topic
+ * Message received from the command queue.
*
*/
- private void processCommand(TextMessage p_oMsg) {
- try {
- String sTxt = p_oMsg.getText();
- if (null == sTxt)
- return;
- String sLow = sTxt.trim().toLowerCase();
- if (sLow.startsWith("shutdown")) {
- m_bEndRequested = true;
- m_oLogger.info("Shutdown has been requested");
- return;
+ private void processCommand(String sTxt) {
+ if (null == sTxt)
+ return;
+
+ String sLow = sTxt.trim().toLowerCase();
+ if (sLow.startsWith("shutdown")) {
+ m_bEndRequested = true;
+ m_oLogger.info("Shutdown has been requested");
+ return;
+ }
+ if (sLow.startsWith("reload param")) {
+ m_bReloadRequested = true;
+ m_oLogger
+ .info("Request for parameter reload has been received");
+ return;
+ }
+ String[] sa = sLow.split("\\s+");
+ if (sa.length > 1 && "endtime".equals(sa[0])) {
+ try {
+ String sDate = sa[1];
+ String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
+ : sa[2];
+ Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
+ m_oLogger.info("New end date set to : " + oEnd);
+ m_lEndTime = oEnd.getTime();
+ } catch (Exception eDat) {
+ m_oLogger.info("Problems with endTime command", eDat);
}
- if (sLow.startsWith("reload param")) {
- m_bReloadRequested = true;
- m_oLogger
- .info("Request for parameter reload has been received");
- return;
- }
- String[] sa = sLow.split("\\s+");
- if (sa.length > 1 && "endtime".equals(sa[0]))
- try {
- String sDate = sa[1];
- String sTime = (sa.length < 3 || null == sa[2]) ? "23:59:59"
- : sa[2];
- Date oEnd = s_oDateParse.parse(sDate + " " + sTime);
- m_oLogger.info("New end date set to : " + oEnd);
- m_lEndTime = oEnd.getTime();
- } catch (Exception eDat) {
- m_oLogger.info("Problems with endTime command", eDat);
- }
- } catch (JMSException eJ) {
- m_oLogger.info("Problems with command queue", eJ);
}
} // ________________________________
@@ -555,10 +515,10 @@
return (endNotRequested() && !timeToReload());
} // ________________________________
- private static final String[] s_saMailProps = { SystemProperties.SMTP_HOST,
- SystemProperties.SMTP_USERNAME, SystemProperties.SMTP_PASSWORD,
- SystemProperties.SMTP_PORT, SystemProperties.SMTP_FROM,
- SystemProperties.SMTP_AUTH };
+ private static final String[] s_saMailProps = { Environment.SMTP_HOST,
+ Environment.SMTP_USERNAME, Environment.SMTP_PASSWORD,
+ Environment.SMTP_PORT, Environment.SMTP_FROM,
+ Environment.SMTP_AUTH };
private void setEmailSystemProperties() {
DomElement oEmail = m_oParms.getFirstElementChild(CHLD_EMAIL_PARMS);
@@ -566,7 +526,7 @@
for (String sCurr : s_saMailProps) {
String sProp = oEmail.getAttr(sCurr);
if (null != sProp)
- System.setProperty(sCurr, sProp);
+ Configuration.getPropertyManager().setProperty(sCurr, sProp);
}
} // ________________________________
@@ -584,56 +544,16 @@
* If requested attribute not found and no default value
* supplied by invoker
*/
- static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
- throws Exception {
+ public static String obtainAtt(DomElement p_oP, String p_sAtt, String p_sDefault)
+ throws ConfigurationException {
String sVal = p_oP.getAttr(p_sAtt);
if ((null == sVal) && (null == p_sDefault))
- throw new Exception("Missing or invalid <" + p_sAtt + "> attribute");
+ throw new ConfigurationException("Missing or invalid <" + p_sAtt + "> attribute");
return (null != sVal) ? sVal : p_sDefault;
} // ________________________________
- private static Class[] s_oaActionConstr = { DomElement.class, Object.class };
-
- public static Class[] getActionClassArgs() {
- return s_oaActionConstr;
- }
-
/**
- * Check to see if an object of the class (arg 0) can be instantiated in
- * this context
- *
- * @param p_sName
- * String - class name to instantiate - Must implement
- * org.jboss.soa.esb.listeners.AbstractActionClass
- * @return Class -
- * @throws Exception -
- * if class not found in path or no appropriate constructor
- */
- protected static Class checkActionClass(String p_sName) throws Exception {
- Class oCls;
- try {
- oCls = Class.forName(p_sName);
- } catch (ClassNotFoundException e) {
- throw new Exception("Class " + p_sName + " not found in classpath");
- }
-
- try {
- oCls.getConstructor(s_oaActionConstr);
- } catch (NoSuchMethodException eN) {
- throw new Exception("No appropriate constructor " + p_sName
- + "(DomElement,Object) found for class ");
- }
- try {
- oCls.asSubclass(AbstractAction.class);
- } catch (ClassCastException eCC) {
- throw new Exception("class " + p_sName + " does not extend "
- + AbstractAction.class.getName());
- }
- return oCls;
- } // _________________________________________
-
- /**
* Find child nodes named "NotificationList" that contain an attribute
* 'type' that starts with "ok" (case insensitive)
*
@@ -643,6 +563,10 @@
* Serializable - Will constitute the body of the notification
*/
public static void notifyOK(DomElement p_oP, Serializable p_oSer) {
+ if(p_oSer == null) {
+ return;
+ }
+
try {
Serializable oNotif = p_oSer;
for (DomElement oCurr : p_oP
@@ -669,8 +593,11 @@
* Serializable - Will be included at the beginning of the body
* of the notification
*/
- public static void notifyError(DomElement p_oP, Exception p_e,
- Serializable p_oSer) {
+ public static void notifyError(DomElement p_oP, Exception p_e, Serializable p_oSer) {
+ if(p_oSer == null) {
+ return;
+ }
+
Serializable oNotif = p_oSer;
ByteArrayOutputStream oBO = new ByteArrayOutputStream();
PrintStream oPS = new PrintStream(oBO);
@@ -709,8 +636,8 @@
if (null == s_oNH)
try {
s_oNH = NotificationHandlerFactory.getNotifHandler(
- "remote", SystemProperties.getJndiServerType(),
- SystemProperties.getJndiServerURL());
+ "remote", Configuration.getJndiServerType(),
+ Configuration.getJndiServerURL());
} catch (Exception e) {
Logger.getLogger(GpListener.class).error(
"Notification FAILED", e);
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/JmsQueueListener.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -1,196 +1,195 @@
/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jboss.soa.esb.listeners;
-import java.lang.reflect.*;
-import java.util.Observer;
-import java.util.Observable;
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.AppServerContext;
+import org.jboss.soa.esb.helpers.DomElement;
-import org.apache.log4j.*;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import javax.jms.TopicSession;
+import javax.naming.Context;
-import javax.naming.*;
-import javax.jms.*;
+public class JmsQueueListener extends AbstractListener {
-import org.jboss.soa.esb.actions.AbstractAction;
-import org.jboss.soa.esb.helpers.*;
+ public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
-public class JmsQueueListener implements Runnable, Observer
-{
- // You can override these values at constructor time of your derived class
- protected int
- m_iSleepForThreads = 3000 // default sleep if no threads available
- ,m_iUpperThreadLimit = 10 // just in case - override if you wish
- ;
- public static final String LISTEN_QUEUE_CONN_FACT = "queueConnFactoryClass";
- public static final String LISTEN_JNDI_TYPE = "listenJndiType";
- public static final String LISTEN_JNDI_URL = "listenJndiURL";
- public static final String LISTEN_QUEUE = "listenQueue";
- public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
+ public static final String LISTEN_JNDI_TYPE = "listenJndiType";
- protected boolean m_bError = false;
+ public static final String LISTEN_JNDI_URL = "listenJndiURL";
- protected QueueConnection m_oQconn;
- protected QueueSession m_oQsess;
- protected Queue m_oQueue;
- protected String m_sSelector;
- protected MessageConsumer m_oRdr;
+ public static final String LISTEN_QUEUE = "listenQueue";
+ public static final String LISTEN_MSG_SELECTOR = "listenMsgSelector";
- protected int m_iQthr = 0, m_iMaxThr;
+ protected boolean m_bError = false;
- protected ThreadGroup m_oThrGrp = null;
-
- protected Logger m_oLogger;
+ protected QueueConnection m_oQconn;
- protected GpListener m_oDad;
- protected DomElement m_oParms;
- protected Class m_oExecClass;
+ protected QueueSession m_oQsess;
- public JmsQueueListener(GpListener p_oDad, DomElement p_oParms) throws Exception
- {
- m_oLogger = Logger.getLogger(this.getClass());
- m_oDad = p_oDad;
- m_oParms = p_oParms.cloneObj();
- checkMyParms();
- m_oThrGrp = new ThreadGroup(m_oParms.getName());
- } //__________________________________
-
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws Exception - if mandatory atts are not right
- * or actionClass not in classpath
- */
- protected void checkMyParms() throws Exception
- {
- String sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_ACTION_CLASS,null);
- m_oExecClass= GpListener.checkActionClass(sAtt);
-
- sAtt = GpListener.obtainAtt(m_oParms
- ,GpListener.PARM_MAX_THREADS,"1");
- int iMax = Integer.parseInt(sAtt);
- m_iMaxThr = Math.min(iMax,m_iUpperThreadLimit);
+ protected Queue m_oQueue;
- // Third arg is null - Exception will br thrown if listenQueue is not found
- String sQueue = GpListener.obtainAtt(m_oParms,LISTEN_QUEUE,null);
-
- // No problem if selector is null - everything in queue will be returned
- m_sSelector = m_oParms.getAttr(LISTEN_MSG_SELECTOR);
+ protected String m_sSelector;
- m_oQconn = null;
- m_oQsess = null;
- m_oQueue = null;
-
- String sJndiType = GpListener.obtainAtt(m_oParms
- ,LISTEN_JNDI_TYPE,"jboss");
- String sJndiURL = GpListener.obtainAtt(m_oParms
- ,LISTEN_JNDI_URL,"localhost");
- Context oJndiCtx = AppServerContext.getServerContext(sJndiType,sJndiURL);
+ protected MessageConsumer jmsMessageReceiver;
- String sFactClass = GpListener.obtainAtt(m_oParms
- ,LISTEN_QUEUE_CONN_FACT,"ConnectionFactory");
- Object tmp = oJndiCtx.lookup(sFactClass);
- QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
- m_oQconn = qcf.createQueueConnection();
- m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
- m_oQsess = m_oQconn.createQueueSession
- (false,TopicSession.AUTO_ACKNOWLEDGE);
- m_oQconn.start();
- m_oRdr = m_oQsess.createReceiver(m_oQueue, m_sSelector);
-
- } //________________________________
+ public JmsQueueListener(GpListener commandListener, DomElement listenerConfig, ActionDefinitionFactory actionDefinitionFactory) throws Exception {
+ super(commandListener, listenerConfig, actionDefinitionFactory);
+ checkMyParms();
+ } // __________________________________
/**
- * Implement run method for this Runnable
- * <p/> Will continue to run until controlling class (ref in m_oDad) indicates
- * no more looping allowed for all child classes
- * <p/> This condition will not prevent child processes to finish normally
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws Exception -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
*/
- public void run()
- {
- while (m_oDad.continueLooping())
- {
- if (m_iQthr >= m_iMaxThr)
- { m_oLogger.info("Waiting for available threads...");
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e) {return; }
- break;
- }
- Message oM = null;
- try { oM = m_oRdr.receive(m_oDad.millisToWait()); }
- catch (JMSException oJ)
- {
- m_oLogger.error("JMS error on receive",oJ);
- for (int i1=0; i1<3; i1++)
- try {checkMyParms(); } // try to reconnect to the queue
- catch (Exception e)
- { m_oLogger.error("Reconnecting to Queue",e);
- try { Thread.sleep(m_iSleepForThreads); }
- catch (InterruptedException e1)
- { //Just return
- return;
- }
- }
- }
- if (null==oM)
- continue;
+ protected void checkMyParms() throws Exception {
+ // Third arg is null - Exception will br thrown if listenQueue is not
+ // found
+ String sQueue = GpListener.obtainAtt(listenerConfig, LISTEN_QUEUE, null);
- AbstractAction oExec = null;
- try
- { Constructor oConst = m_oExecClass
- .getConstructor(GpListener.getActionClassArgs());
- oExec = (AbstractAction)oConst.newInstance
- (new Object[] {m_oParms,oM});
- }
- catch (Exception e)
- { m_oLogger.error("Can't instantiate action class",e);
- break;
- }
- // invoke the run method of the AbstractAction
- m_iQthr += 1;
- oExec.addObserver(this);
- new Thread(oExec).start();
- }
- if (null!=m_oQsess)
- try { m_oQsess.close(); }
- catch (Exception e1) {/* Tried my best - Just continue */}
- if (null!=m_oQconn)
- try { m_oQconn.close(); }
- catch (Exception e2) {/* Tried my best - Just continue */}
- } //______________________________
-
-/**
- * Implementation of Observer interface
- * <p/> Just count the number of active child threads
- *
- */
- public void update(Observable p_oObs, Object p_oUsrObj)
- {
- if (p_oUsrObj instanceof Integer)
- m_iQthr += ((Integer) p_oUsrObj).intValue();
- } //________________________________
+ // No problem if selector is null - everything in queue will be returned
+ m_sSelector = listenerConfig.getAttr(LISTEN_MSG_SELECTOR);
-
-} //____________________________________________________________________________
+ m_oQconn = null;
+ m_oQsess = null;
+ m_oQueue = null;
+
+ String sJndiType = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_TYPE,
+ "jboss");
+ String sJndiURL = GpListener.obtainAtt(listenerConfig, LISTEN_JNDI_URL,
+ "localhost");
+ Context oJndiCtx = AppServerContext.getServerContext(sJndiType,
+ sJndiURL);
+
+ String sFactClass = GpListener.obtainAtt(listenerConfig,
+ LISTEN_QUEUE_CONN_FACT, "ConnectionFactory");
+ Object tmp = oJndiCtx.lookup(sFactClass);
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
+
+ m_oQconn = qcf.createQueueConnection();
+ m_oQueue = (Queue) oJndiCtx.lookup(sQueue);
+ m_oQsess = m_oQconn.createQueueSession(false,
+ TopicSession.AUTO_ACKNOWLEDGE);
+ m_oQconn.start();
+ jmsMessageReceiver = m_oQsess.createReceiver(m_oQueue, m_sSelector);
+
+ } // ________________________________
+
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#receive()
+ */
+ @Override
+ protected Object[] receive() {
+ while (m_oDad.continueLooping()) {
+ Message jmsMessage = null;
+ try {
+ jmsMessage = jmsMessageReceiver.receive(m_oDad.millisToWait());
+ } catch (JMSException oJ) {
+ logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
+ for (int i1 = 0; i1 < 3; i1++)
+ try {
+ checkMyParms();
+ } // try to reconnect to the queue
+ catch (Exception e) {
+ logger.error("Reconnecting to Queue", e);
+ try {
+ Thread.sleep(m_iSleepForThreads);
+ } catch (InterruptedException e1) { // Just return
+ logger.error("Unexpected thread interupt exception.", e);
+ return null;
+ }
+ }
+ }
+ if (null == jmsMessage) {
+ // REVIEW: Can this really happen i.e. the JMS
+ continue;
+ }
+
+ if (jmsMessage instanceof ObjectMessage) {
+ try {
+ return new Object[] {((ObjectMessage)jmsMessage).getObject()};
+ } catch (JMSException e) {
+ logger.error("Failed to read Serialized Object from JMS message.", e);
+ }
+ } else if (jmsMessage instanceof TextMessage) {
+ try {
+ return new Object[] {((TextMessage)jmsMessage).getText()};
+ } catch (JMSException e) {
+ logger.error("Failed to read Serialized Object from JMS message.", e);
+ }
+ } else {
+ logger.error("Unsupported JMS message type: " + jmsMessage.getClass().getName());
+ }
+ }
+
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ if (null != m_oQsess) {
+ try {
+ m_oQsess.close();
+ } catch (Exception e1) {/* Tried my best - Just continue */
+ }
+ }
+ if (null != m_oQconn) {
+ try {
+ m_oQconn.close();
+ } catch (Exception e2) {/* Tried my best - Just continue */
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ }
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/listeners/src/org/jboss/soa/esb/listeners/SqlTablePoller.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -23,14 +23,27 @@
package org.jboss.soa.esb.listeners;
-import java.util.*;
-import java.sql.*;
-import javax.sql.*;
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
-import org.jboss.soa.esb.helpers.*;
-import org.jboss.soa.esb.helpers.persist.*;
-import org.jboss.soa.esb.actions.*;
-import org.jboss.soa.esb.util.*;
+import javax.sql.DataSource;
+
+import org.jboss.soa.esb.actions.ActionDefinitionFactory;
+import org.jboss.soa.esb.actions.ActionProcessor;
+import org.jboss.soa.esb.helpers.DomElement;
+import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
+import org.jboss.soa.esb.helpers.persist.SimpleDataSource;
+import org.jboss.soa.esb.util.Util;
+
/**
* SqlTablePoller class
*
@@ -155,13 +168,13 @@
* @param p_oParms DomElement - Sub tree that corresponds to this instance
* @throws Exception
*/
- public SqlTablePoller(GpListener p_oDad, DomElement p_oParms) throws Exception
+ public SqlTablePoller(GpListener p_oDad, DomElement p_oParms, ActionDefinitionFactory actionDefinitionFactory) throws Exception
{
- super(p_oDad,p_oParms);
+ super(p_oDad, p_oParms, actionDefinitionFactory);
try { checkMyParms(); }
catch (Exception e)
{
- m_oLogger.error("checkMyParms() FAILED",e);
+ logger.error("checkMyParms() FAILED",e);
throw e;
}
} //__________________________________
@@ -174,19 +187,19 @@
protected void checkMyParms() throws Exception
{
- checkAndStoreAtt(m_oParms,SimpleDataSource.DRIVER ,null);
- checkAndStoreAtt(m_oParms,SimpleDataSource.URL ,null);
- checkAndStoreAtt(m_oParms,SimpleDataSource.USER ,"");
- checkAndStoreAtt(m_oParms,SimpleDataSource.PASSWORD ,"");
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.DRIVER ,null);
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.URL ,null);
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.USER ,"");
+ checkAndStoreAtt(listenerConfig,SimpleDataSource.PASSWORD ,"");
for (TABLE_ATT oCurr : TABLE_ATT.values())
- checkAndStoreAtt(m_oParms,oCurr.toString(),null);
+ checkAndStoreAtt(listenerConfig,oCurr.toString(),null);
- checkAndStoreAtt(m_oParms,OPTIONAL_ATT.whereCondition.toString(),"");
- checkAndStoreAtt(m_oParms,OPTIONAL_ATT.orderBy.toString(),"");
+ checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.whereCondition.toString(),"");
+ checkAndStoreAtt(listenerConfig,OPTIONAL_ATT.orderBy.toString(),"");
String sAtt = OPTIONAL_ATT.inProcessVals.toString();
- checkAndStoreAtt(m_oParms,sAtt,DEFAULT_STATES);
+ checkAndStoreAtt(listenerConfig,sAtt,DEFAULT_STATES);
m_sUpdStates = m_oVals.get(sAtt);
if (m_sUpdStates.length()<4)
throw new Exception("Parameter <"+sAtt+"> must be at least 4 characters long (PWED)");
@@ -219,12 +232,13 @@
} //________________________________
@Override
- protected Object preProcess(Object p_o) throws Exception
+ protected Object preProcess(Object p_o)
{
return p_o;
} //________________________________
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
protected List<Object> pollForCandidates()
{
String sSel4U = selectForUpdStatement();
@@ -238,27 +252,29 @@
PreparedStatement PS = oConn.prepareStatement(sScan);
ResultSet RS = oConn.execQueryWait(PS,1);
- while (RS.next())
- { Map<String,Object> oColVals = new HashMap<String,Object>();
+ while (RS.next()) {
+ SQLPollResult rowParams = new SQLPollResult(sSel4U, sUpdStmt);
int iCurr = 0;
- for (String sColName : m_saCols)
- oColVals.put(sColName,RS.getObject(++iCurr));
+ for (String sColName : m_saCols) {
+ rowParams.put(sColName,RS.getObject(++iCurr));
+ }
+
// Set up the parameter object for the SqlRowAction
- AbstractSqlRowAction.Params oActionP = new AbstractSqlRowAction.Params();
- oActionP.omVals = oColVals;
- oActionP.sUpdStates = m_sUpdStates;
- oActionP.saCols = m_saCols;
- oActionP.saKeys = m_saKeys;
- oActionP.sSel4Upd = sSel4U;
- oActionP.sUpdate = sUpdStmt;
+ rowParams.sUpdStates = m_sUpdStates;
+ rowParams.saKeys = m_saKeys;
+ rowParams.sSel4Upd = sSel4U;
+ rowParams.sUpdate = sUpdStmt;
+
+ // Mark the row as "working"...
+ rowParams.changeStatusToWorking();
- oResults.add(oActionP);
+ oResults.add(rowParams);
}
}
catch (Exception e)
{
- m_oLogger.warn("Some triggers might not have been returned",e);
+ logger.warn("Some triggers might not have been returned",e);
}
finally
{
@@ -266,6 +282,7 @@
oConn.release();
}
+ logger.info("Returning " + oResults.size() + " rows.");
return oResults;
} //________________________________
@@ -362,4 +379,127 @@
return sb.append(" for update").toString();
} //________________________________
-} //____________________________________________________________________________
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#close()
+ */
+ @Override
+ protected void close() {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingError(java.lang.Object, org.jboss.soa.esb.actions.ActionProcessor, java.lang.Throwable)
+ */
+ @Override
+ protected void processingError(Object initialMessage, ActionProcessor processor, Throwable error) {
+ // Mark the row as "error"...
+ ((SQLPollResult)initialMessage).changeStatusToError();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.listeners.AbstractListener#processingComplete(java.lang.Object)
+ */
+ @Override
+ protected void processingComplete(Object initialMessage) {
+ // Mark the row as "working"...
+ ((SQLPollResult)initialMessage).changeStatusToDone();
+ }
+
+ private class SQLPollResult extends LinkedHashMap implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String sUpdStates;
+
+ private String[] saKeys;
+
+ private String sSel4Upd, sUpdate;
+
+ private SQLPollResult(String sSel4Upd, String sUpdate) throws Exception {
+ this.sSel4Upd = sSel4Upd;
+ this.sUpdate = sUpdate;
+ }
+
+ private String getStatus(ROW_STATE p_oState) {
+ int iPos = p_oState.ordinal();
+ return sUpdStates.substring(iPos, ++iPos);
+ }
+
+ private boolean changeStatusToWorking() {
+ return changeStatus(ROW_STATE.Pending, ROW_STATE.Working);
+ }
+
+ private boolean changeStatusToDone() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Done);
+ }
+
+ private boolean changeStatusToError() {
+ return changeStatus(ROW_STATE.Working, ROW_STATE.Error);
+ }
+
+ private boolean changeStatus(ROW_STATE fromState, ROW_STATE toState) {
+ JdbcCleanConn dbConnection = null;
+
+ try {
+ // This is expensive at the moment but will be OK once we get proper connection pooling enabled!
+ dbConnection = newDbConn();
+ } catch (Exception e) {
+ logger.error("Unable to get DB connection.", e);
+ throw new IllegalStateException("Unable to get DB connection.", e);
+ }
+
+ try {
+ PreparedStatement m_PSsel4U;
+ PreparedStatement m_PSupd;
+
+ m_PSsel4U = dbConnection.prepareStatement(sSel4Upd);
+ m_PSupd = dbConnection.prepareStatement(sUpdate);
+
+ int iParm=1;
+ for (String sColName : saKeys) {
+ Object oVal = get(sColName);
+ m_PSsel4U.setObject (iParm ,oVal);
+ // parameters are +1 in update statement
+ // autoincrement leaves things ready for next SQL parameter
+ m_PSupd.setObject (++iParm,oVal);
+ }
+
+ try {
+ ResultSet resultSet = dbConnection.execQueryWait(m_PSsel4U, 5);
+
+ if (resultSet.next()) {
+ String sOldStatus = resultSet.getString(1).substring(0, 1);
+
+ if (sOldStatus.equalsIgnoreCase(getStatus(fromState))) {
+ m_PSupd.setString(1, getStatus(toState));
+ dbConnection.execUpdWait(m_PSupd, 5);
+ dbConnection.commit();
+
+ if(logger.isDebugEnabled()) {
+ logger.debug("Successfully changed row state from " + fromState + " to " + toState + ".");
+ }
+
+ return true;
+ } else {
+ logger.warn("Cannot change row state from " + fromState + " to " + toState + ". Row not in state " + fromState);
+ return false;
+ }
+ }
+ logger.error("Row status change to " + toState + " has failed. Rolling back!!");
+ } catch(Exception e) {
+ logger.error("Row status change to " + toState + " has failed. Rolling back!!", e);
+ }
+
+ try {
+ dbConnection.rollback();
+ } catch (Exception e) {
+ logger.error("Unable to rollback row status change to " + fromState.name(), e);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected exception.", e);
+ } finally {
+ dbConnection.release();
+ }
+
+ return false;
+ }
+ }
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/rosetta/build.xml
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/build.xml 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/build.xml 2006-09-27 21:16:44 UTC (rev 6451)
@@ -15,7 +15,7 @@
</condition>
<path id="org.jboss.esb.rosetta.base.classpath">
- <fileset dir="${org.jboss.esb.ext.lib.dir}" includes="activation.jar jbossall-client.jar log4j.jar mail.jar"/>
+ <fileset dir="${org.jboss.esb.ext.lib.dir}" includes="*.jar"/>
<fileset dir="${org.jboss.esb.ejb3.lib.dir}" includes="*.jar,*.zip"/>
</path>
Added: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Configuration.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/common/Configuration.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Configuration.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,213 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.common;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Properties;
+
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+import com.arjuna.common.internal.util.propertyservice.plugins.io.XMLFilePlugin;
+import com.arjuna.common.util.exceptions.LoadPropertiesException;
+import com.arjuna.common.util.exceptions.ManagementPluginException;
+import com.arjuna.common.util.exceptions.SavePropertiesException;
+import com.arjuna.common.util.propertyservice.PropertyManager;
+import com.arjuna.common.util.propertyservice.PropertyManagerFactory;
+import com.arjuna.common.util.propertyservice.plugins.PropertyManagementPlugin;
+
+public class Configuration
+{
+ public static PropertyManager propertyManager;
+
+ public static PropertyManager getPropertyManager()
+ {
+ return propertyManager;
+ }
+
+ private static KeyValuePair[] s_oaKV;
+
+ public static String dump()
+ {
+ StringBuilder sb = new StringBuilder("Dump of Configuration:\n");
+ for (KeyValuePair oCurr : s_oaKV)
+ {
+ sb.append(oCurr.getKey()).append("=").append(oCurr.getValue())
+ .append("\n");
+ }
+ return sb.append("______________________________________").toString();
+ } // ________________________________
+
+ public static String getSmtpHost()
+ {
+ return getPropertyManager().getProperty(Environment.SMTP_HOST, Environment.DEFAULT_HOST);
+ }
+
+ public static String getSmtpUsername()
+ {
+ return System
+ .getProperty(Environment.SMTP_USERNAME, Environment.DEFAULT_USERNAME);
+ }
+
+ public static String getSmtpPassword()
+ {
+ return System
+ .getProperty(Environment.SMTP_PASSWORD, Environment.DEFAULT_PASSWORD);
+ }
+
+ public static String getSmtpPort()
+ {
+ return getPropertyManager().getProperty(Environment.SMTP_PORT, Environment.DEFAULT_PORT);
+ }
+
+ public static String getSmtpFrom()
+ {
+ return getPropertyManager().getProperty(Environment.SMTP_FROM);
+ }
+
+ public static String getSmtpAuth()
+ {
+ return getPropertyManager().getProperty(Environment.SMTP_AUTH);
+ }
+
+ public static String getJndiServerType()
+ {
+ return getPropertyManager().getProperty(Environment.JNDI_SERVER_TYPE,
+ Environment.DEFAULT_SERVER_TYPE);
+ }
+
+ public static String getJndiServerURL()
+ {
+ return getPropertyManager().getProperty(Environment.JNDI_SERVER_URL, Environment.DEFAULT_HOST);
+ }
+
+ public static String getParamRepositoryImplClass()
+ {
+ return getPropertyManager().getProperty(Environment.PARAMS_REPOS_IMPL_CLASS);
+ }
+
+ public static String getObjStoreConfigFile()
+ {
+ String property = getPropertyManager().getProperty(Environment.OBJECT_STORE_CONFIG_FILE);
+ return property;
+ }
+
+ public static String getEncryptionFactoryClass()
+ {
+ return getPropertyManager().getProperty(Environment.ENCRYPT_FACTORY_CLASS,
+ org.jboss.soa.esb.services.DefaultEncryptionFactory.class
+ .getName());
+ }
+
+ static
+ {
+ /**
+ * Retrieve the property manager from the factory and add the ESB properties file to it.
+ *
+ * Ideally the various components in the ESB should have their own sections within the
+ * property file and the each component uses its own PropertyManager instance. However, the
+ * current structure of the ESB does not lend itself well to that.
+ *
+ * TODO
+ */
+
+ propertyManager = new TempPropertyManager();
+ //propertyManager = PropertyManagerFactory.getPropertyManager("com.arjuna.ats.propertymanager");
+
+ //String propertiesFilename = getPropertyManager().getProperty(Environment.PROPERTIES_FILE, Environment.DEFAULT_PROPERTY_FILE);
+
+ try
+ {
+ //propertyManager.load(XMLFilePlugin.class.getName(), propertiesFilename);
+ }
+ catch (Exception e)
+ {
+ throw new ExceptionInInitializerError(e.toString());
+ }
+
+ s_oaKV = new KeyValuePair[]
+ {
+ new KeyValuePair(Environment.SMTP_HOST, getSmtpHost()),
+ new KeyValuePair(Environment.SMTP_USERNAME, getSmtpUsername()),
+ new KeyValuePair(Environment.SMTP_PASSWORD, getSmtpPassword()),
+ new KeyValuePair(Environment.SMTP_PORT, getSmtpPort()),
+ new KeyValuePair(Environment.SMTP_FROM, getSmtpFrom()),
+ new KeyValuePair(Environment.SMTP_AUTH, getSmtpAuth()),
+ new KeyValuePair(Environment.JNDI_SERVER_TYPE, getJndiServerType()),
+ new KeyValuePair(Environment.JNDI_SERVER_URL, getJndiServerURL()),
+ new KeyValuePair(Environment.PARAMS_REPOS_IMPL_CLASS,
+ getParamRepositoryImplClass()),
+ new KeyValuePair(Environment.OBJECT_STORE_CONFIG_FILE, getObjStoreConfigFile()),
+ new KeyValuePair(Environment.ENCRYPT_FACTORY_CLASS, getEncryptionFactoryClass())
+
+ };
+ }
+
+ private static class TempPropertyManager implements PropertyManager {
+
+ public String getProperty(String arg0) {
+ return System.getProperty(arg0);
+ }
+
+ public String getProperty(String arg0, String arg1) {
+ return System.getProperty(arg0, arg1);
+ }
+
+ public String setProperty(String arg0, String arg1, boolean arg2) {
+ throw new UnsupportedOperationException("Not supported! Temporarily out of action");
+ }
+
+ public String setProperty(String arg0, String arg1) {
+ return System.setProperty(arg0, arg1);
+ }
+
+ public String removeProperty(String arg0) {
+ return (String) System.getProperties().remove(arg0);
+ }
+
+ public Properties getProperties() {
+ return System.getProperties();
+ }
+
+ public Enumeration propertyNames() {
+ return System.getProperties().keys();
+ }
+
+ public void load(String arg0, String arg1) throws IOException, ClassNotFoundException, LoadPropertiesException {
+ throw new UnsupportedOperationException("Not supported! Temporarily out of action");
+ }
+
+ public void save(String arg0, String arg1) throws IOException, ClassNotFoundException, SavePropertiesException {
+ throw new UnsupportedOperationException("Not supported! Temporarily out of action");
+ }
+
+ public void addManagementPlugin(PropertyManagementPlugin arg0) throws IOException, ManagementPluginException {
+ throw new UnsupportedOperationException("Not supported! Temporarily out of action");
+ }
+
+ public boolean verbose() {
+ return false;
+ }
+
+ }
+}
\ No newline at end of file
Property changes on: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Configuration.java
___________________________________________________________________
Name: svn:executable
+ *
Added: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/common/Environment.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/Environment.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.common;
+
+import org.jboss.soa.esb.helpers.KeyValuePair;
+
+public class Environment
+{
+ public static final String PROPERTIES_FILE = "org.jboss.soa.esb.propertyFile";
+
+ public static final String SMTP_HOST = "org.jboss.soa.esb.mail.smtp.host";
+ public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";
+ public static final String SMTP_PASSWORD = "org.jboss.soa.esb.mail.smtp.password";
+ public static final String SMTP_PORT = "org.jboss.soa.esb.mail.smtp.port";
+ public static final String SMTP_FROM = "org.jboss.soa.esb.mail.smtp.from";
+ public static final String SMTP_AUTH = "org.jboss.soa.esb.mail.smtp.auth";
+ public static final String JNDI_SERVER_TYPE = "org.jboss.soa.esb.jndi.server.type";
+ public static final String JNDI_SERVER_URL = "org.jboss.soa.esb.jndi.server.url";
+ public static final String PARAMS_REPOS_IMPL_CLASS = "org.jboss.soa.esb.paramsRepository.class";
+ public static final String OBJECT_STORE_CONFIG_FILE = "org.jboss.soa.esb.objStore.configfile";
+ public static final String ENCRYPT_FACTORY_CLASS = "org.jboss.soa.esb.encryption.factory.class";
+
+ public static final String DEFAULT_PROPERTY_FILE = "jbossesb-properties.xml";
+ public static final String DEFAULT_HOST = "localhost";
+ public static final String DEFAULT_USERNAME = "";
+ public static final String DEFAULT_PASSWORD = "";
+ public static final String DEFAULT_PORT = "25";
+ public static final String DEFAULT_SERVER_TYPE = "jboss";
+
+}
\ No newline at end of file
Deleted: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/SystemProperties.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/common/SystemProperties.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/common/SystemProperties.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -1,132 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.soa.esb.common;
-
-import org.jboss.soa.esb.helpers.KeyValuePair;
-
-public class SystemProperties {
-
- public static final String SMTP_HOST = "org.jboss.soa.esb.mail.smtp.host";
-
- public static final String SMTP_USERNAME = "org.jboss.soa.esb.mail.smtp.user";
-
- public static final String SMTP_PASSWORD = "org.jboss.soa.esb.mail.smtp.password";
-
- public static final String SMTP_PORT = "org.jboss.soa.esb.mail.smtp.port";
-
- public static final String SMTP_FROM = "org.jboss.soa.esb.mail.smtp.from";
-
- public static final String SMTP_AUTH = "org.jboss.soa.esb.mail.smtp.auth";
-
- public static final String JNDI_SERVER_TYPE = "org.jboss.soa.esb.jndi.server.type";
-
- public static final String JNDI_SERVER_URL = "org.jboss.soa.esb.jndi.server.url";
-
- public static final String PARAMS_REPOS_IMPL_CLASS = "org.jboss.soa.esb.paramsRepository.class";
-
- public static final String OBJECT_STORE_CONFIG_FILE = "org.jboss.soa.esb.objStore.configfile";
-
- public static final String ENCRYPT_FACTORY_CLASS = "org.jboss.soa.esb.encryption.factory.class";
-
- public static final String DEFAULT_HOST = "localhost";
-
- public static final String DEFAULT_USERNAME = "";
-
- public static final String DEFAULT_PASSWORD = "";
-
- public static final String DEFAULT_PORT = "25";
-
- public static final String DEFAULT_SERVER_TYPE = "jboss";
-
- private static KeyValuePair[] s_oaKV = new KeyValuePair[] {
- new KeyValuePair(SMTP_HOST, getSmtpHost()),
- new KeyValuePair(SMTP_USERNAME, getSmtpUsername()),
- new KeyValuePair(SMTP_PASSWORD, getSmtpPassword()),
- new KeyValuePair(SMTP_PORT, getSmtpPort()),
- new KeyValuePair(SMTP_FROM, getSmtpFrom()),
- new KeyValuePair(SMTP_AUTH, getSmtpAuth()),
- new KeyValuePair(JNDI_SERVER_TYPE, getJndiServerType()),
- new KeyValuePair(JNDI_SERVER_URL, getJndiServerURL()),
- new KeyValuePair(PARAMS_REPOS_IMPL_CLASS,
- getParamRepositoryImplClass()),
- new KeyValuePair(OBJECT_STORE_CONFIG_FILE, getObjStoreConfigFile()),
- new KeyValuePair(ENCRYPT_FACTORY_CLASS, getEncryptionFactoryClass())
-
- };
-
- public static String dump() {
- StringBuilder sb = new StringBuilder("Dump of SystemProperties:\n");
- for (KeyValuePair oCurr : s_oaKV) {
- sb.append(oCurr.getKey()).append("=").append(oCurr.getValue()).append("\n");
- }
- return sb.append("______________________________________").toString();
- } // ________________________________
-
- public static String getSmtpHost() {
- return System.getProperty(SMTP_HOST, SystemProperties.DEFAULT_HOST);
- }
-
- public static String getSmtpUsername() {
- return System.getProperty(SMTP_USERNAME, SystemProperties.DEFAULT_USERNAME);
- }
-
- public static String getSmtpPassword() {
- return System.getProperty(SMTP_PASSWORD, SystemProperties.DEFAULT_PASSWORD);
- }
-
- public static String getSmtpPort() {
- return System.getProperty(SMTP_PORT, SystemProperties.DEFAULT_PORT);
- }
-
- public static String getSmtpFrom() {
- return System.getProperty(SMTP_FROM);
- }
-
- public static String getSmtpAuth() {
- return System.getProperty(SMTP_AUTH);
- }
-
- public static String getJndiServerType() {
- return System.getProperty(JNDI_SERVER_TYPE,
- SystemProperties.DEFAULT_SERVER_TYPE);
- }
-
- public static String getJndiServerURL() {
- return System.getProperty(JNDI_SERVER_URL, SystemProperties.DEFAULT_HOST);
- }
-
- public static String getParamRepositoryImplClass() {
- return System.getProperty(PARAMS_REPOS_IMPL_CLASS);
- }
-
- public static String getObjStoreConfigFile() {
- String property = System.getProperty(OBJECT_STORE_CONFIG_FILE);
- return property;
- }
-
- public static String getEncryptionFactoryClass() {
- return System.getProperty(ENCRYPT_FACTORY_CLASS,
- org.jboss.soa.esb.services.DefaultEncryptionFactory.class
- .getName());
- }
-}
\ No newline at end of file
Added: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/exceptions/ConfigurationException.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/exceptions/ConfigurationException.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/exceptions/ConfigurationException.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -0,0 +1,55 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2006, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.soa.esb.exceptions;
+
+/**
+ * Configuration Exception.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @since Version 4.0
+ */
+public class ConfigurationException extends org.jboss.soa.esb.exceptions.BaseException {
+
+ /**
+ * Construct an exception instance.
+ * @param message Exception message.
+ */
+ public ConfigurationException(String message) {
+ super(message);
+ }
+
+ /**
+ * Construct an exception instance.
+ * @param message Exception message.
+ * @param cause Exception cause.
+ */
+ public ConfigurationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Construct an exception instance.
+ * @param cause Exception cause.
+ */
+ public ConfigurationException(Throwable cause) {
+ super(cause);
+ }
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/Email.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/helpers/Email.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/Email.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -31,7 +31,8 @@
import javax.mail.internet.*;
import org.apache.log4j.Logger;
-import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.util.*;
/**
@@ -360,25 +361,25 @@
*/
private Session initMailServerSession() {
Authenticator oAuth = null;
- String sSmtpUser = SystemProperties.getSmtpUsername();
+ String sSmtpUser = Configuration.getSmtpUsername();
if (! Util.isNullString(sSmtpUser)) {
- oAuth = new MyAuth(sSmtpUser, SystemProperties.getSmtpPassword());
+ oAuth = new MyAuth(sSmtpUser, Configuration.getSmtpPassword());
}
Properties oMailP = new Properties();
- oMailP.setProperty("mail.smtp.host", SystemProperties.getSmtpHost());
- String sAuth = SystemProperties.getSmtpAuth();
+ oMailP.setProperty("mail.smtp.host", Configuration.getSmtpHost());
+ String sAuth = Configuration.getSmtpAuth();
if(sAuth != null) {
if(!sAuth.trim().equals("")) {
- logger.warn("'" + SystemProperties.SMTP_AUTH + "' set to an empty value.");
+ logger.warn("'" + Environment.SMTP_AUTH + "' set to an empty value.");
}
oMailP.setProperty("mail.smtp.auth", sAuth);
}
try {
- String sPort = SystemProperties.getSmtpPort();
- this.from = SystemProperties.getSmtpFrom();
+ String sPort = Configuration.getSmtpPort();
+ this.from = Configuration.getSmtpFrom();
Integer.parseInt(sPort);
oMailP.setProperty("mail.smtp.port", sPort);
}
Modified: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/KeyValuePair.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/helpers/KeyValuePair.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/helpers/KeyValuePair.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -23,6 +23,7 @@
package org.jboss.soa.esb.helpers;
import java.io.Serializable;
+import java.util.List;
public class KeyValuePair implements Serializable {
private static final long serialVersionUID = 1L;
@@ -53,4 +54,110 @@
public String dump() {
return "KVpair[" + mKey + "=" + mVal + "]";
}
-} // ____________________________________________________________________________
+
+ /**
+ * Get the value associated with the specified key from the supplied list of Key Value Pairs.
+ * <p/>
+ * Returns the value from the first matching key.
+ * @param key The key to search for.
+ * @param list The list of KeyValuePairs to search.
+ * @return The value associated with the supplied key, or null if key not found.
+ */
+ public static String getValue(String key, List<KeyValuePair> list) {
+ if(key == null) {
+ throw new IllegalArgumentException("null 'key' arg in call.");
+ }
+ if(list == null) {
+ throw new IllegalArgumentException("null 'list' arg in call.");
+ }
+
+ for(KeyValuePair kvp : list) {
+ if(kvp.mKey.equals(key)) {
+ return kvp.mVal;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the value associated with the specified key from the supplied list of Key Value Pairs.
+ * <p/>
+ * Returns the value from the first matching key.
+ * @param key The key to search for.
+ * @param list The list of KeyValuePairs to search.
+ * @param defaultVal The default value to be returned where there's no value available for the specified key.
+ * @return The value associated with the supplied key, or null if key not found.
+ */
+ public static String getValue(String key, List<KeyValuePair> list, String defaultVal) {
+ String value = getValue(key, list);
+
+ if(value == null) {
+ return defaultVal;
+ }
+
+ return value;
+ }
+
+ /**
+ * Get the boolean equivalent value associated with the specified key
+ * from the supplied list of Key Value Pairs.
+ * <p/>
+ * Returns:
+ * <ul>
+ * <li><b><code>true</code></b>: If value equals "true" or "yes" or "y" (ignoring case).</li>
+ * <li><b><code>false</code></b>: If value equals "false" or "no" or "n" (ignoring case).</li>
+ * <li><b><i>defaultVal</i></b>: If none of the above hold true.</li>
+ * </ul>
+ * <p/>
+ * Returns the value from the first matching key.
+ * @param key The key to search for.
+ * @param list The list of KeyValuePairs to search.
+ * @param defaultVal The default value to be returned where the above listed conditions do not hold
+ * for the associated value, or the value is not specified.
+ * @return The boolean equivalent value associated with the specified key according to the above specified
+ * rules, otherwise the <b><i>defaultVal</i></b> is returned.
+ */
+ public static boolean getBooleanValue(String key, List<KeyValuePair> list, boolean defaultVal) {
+ String value = getValue(key, list);
+
+ if(value == null) {
+ return defaultVal;
+ }
+
+ if(value.equalsIgnoreCase("true") || value.equalsIgnoreCase("y") || value.equalsIgnoreCase("yes")) {
+ return true;
+ } else if(value.equalsIgnoreCase("false") || value.equalsIgnoreCase("n") || value.equalsIgnoreCase("no")) {
+ return false;
+ } else {
+ return defaultVal;
+ }
+ }
+
+ /**
+ * Get the numeric <code>double</code> equivalent value associated with the specified key
+ * from the supplied list of Key Value Pairs.
+ * <p/>
+ * Returns the value from the first matching key.
+ * @param key The key to search for.
+ * @param list The list of KeyValuePairs to search.
+ * @param defaultVal The default value to be returned where the value is not found or is non-numeric.
+ * @return The <code>double</code> equivalent value associated with the specified key if the value is found
+ * and is numeric, otherwise the <b><i>defaultVal</i></b> is returned.
+ */
+ public static double getDoubleValue(String key, List<KeyValuePair> list, double defaultVal) {
+ String value = getValue(key, list);
+
+ if(value == null) {
+ return defaultVal;
+ }
+
+ try {
+ return Double.parseDouble(value);
+ } catch(NumberFormatException e) {
+ // return the default...
+ }
+
+ return defaultVal;
+ }
+}
Modified: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamFileRepository.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamFileRepository.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamFileRepository.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -11,6 +11,7 @@
import javax.naming.InvalidNameException;
import org.apache.log4j.Logger;
+import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.util.StreamUtils;
@@ -46,7 +47,7 @@
* Public default constructor.
*/
public ParamFileRepository() {
- String rootDir = System.getProperty(FILE_PARAMS_REPOS_ROOT);
+ String rootDir = Configuration.getPropertyManager().getProperty(FILE_PARAMS_REPOS_ROOT);
// Set the repository root directory.
if(rootDir == null) {
Modified: labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamRepositoryFactory.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamRepositoryFactory.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/rosetta/src/org/jboss/soa/esb/parameters/ParamRepositoryFactory.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -21,7 +21,7 @@
*/
package org.jboss.soa.esb.parameters;
-import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.common.Configuration;
/**
* Factory class for Singleton {@link ParamRepository} creation.
@@ -65,7 +65,7 @@
// at the start of this synch block will simply fall through and return the
// instance created by the thread that got in ahead of them.
if(instance == null) {
- String runtimeClassName = SystemProperties.getParamRepositoryImplClass();
+ String runtimeClassName = Configuration.getParamRepositoryImplClass();
if(runtimeClassName == null) {
// If there's no repository name configured, return the
Modified: labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/BobjStore.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/BobjStore.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/BobjStore.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -22,7 +22,8 @@
package org.jboss.soa.esb.internal.core.objectstore;
-import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.helpers.DomElement;
import org.jboss.soa.esb.util.BaseBusinessObject;
import org.jboss.soa.esb.util.BobjStdDTO;
@@ -144,17 +145,17 @@
* Tries reading it from the classpath. If that fails, tries reading it through a URI stream.
*/
protected static DomElement readConfiguration() throws IOException, MalformedURLException, SAXException {
- String configPath = SystemProperties.getObjStoreConfigFile();
+ String configPath = Configuration.getObjStoreConfigFile();
URI configURI;
InputStream configStream = null;
if(configPath == null) {
- throw new IllegalStateException("ObjStoreConfigFile location not configured (param name: " + SystemProperties.OBJECT_STORE_CONFIG_FILE + ").");
+ throw new IllegalStateException("ObjStoreConfigFile location not configured (param name: " + Environment.OBJECT_STORE_CONFIG_FILE + ").");
}
configURI = URI.create(configPath);
if(!configURI.isAbsolute() || configURI.getPath() == null) {
- throw new IllegalStateException("ObjStoreConfigFile location configuration error (" + SystemProperties.OBJECT_STORE_CONFIG_FILE + "=" + configPath + "). Must be an absolute URI e.g. http://, file:/, classpath:/ etc.");
+ throw new IllegalStateException("ObjStoreConfigFile location configuration error (" + Environment.OBJECT_STORE_CONFIG_FILE + "=" + configPath + "). Must be an absolute URI e.g. http://, file:/, classpath:/ etc.");
}
// Is it on the classpath...
Modified: labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/DaoSnapTable.java
===================================================================
--- labs/jbossesb/workspace/rearchitecture/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/DaoSnapTable.java 2006-09-27 19:50:01 UTC (rev 6446)
+++ labs/jbossesb/workspace/arvinder/product/core/services/src/org/jboss/soa/esb/internal/core/objectstore/DaoSnapTable.java 2006-09-27 21:16:44 UTC (rev 6451)
@@ -23,7 +23,7 @@
package org.jboss.soa.esb.internal.core.objectstore;
import org.apache.log4j.Logger;
-import org.jboss.soa.esb.common.SystemProperties;
+import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.helpers.persist.JdbcCleanConn;
import org.jboss.soa.esb.helpers.persist.SqlDbTable;
import org.jboss.soa.esb.services.DefaultEncryptionFactory;
@@ -235,7 +235,7 @@
{ if (null!=m_oMangler)
return m_oMangler;
- String sFactoryName = SystemProperties.getEncryptionFactoryClass();
+ String sFactoryName = Configuration.getEncryptionFactoryClass();
Class oFactClass = DefaultEncryptionFactory.class;
if (null!=sFactoryName)
oFactClass = Class.forName(sFactoryName);
More information about the jboss-svn-commits
mailing list