[jboss-svn-commits] JBL Code SVN: r10613 - in labs/jbossesb/trunk/product: core/listeners/src/org/jboss/soa/esb/listeners/config and 11 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Mar 29 10:48:48 EDT 2007
Author: tfennelly
Date: 2007-03-29 10:48:47 -0400 (Thu, 29 Mar 2007)
New Revision: 10613
Added:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/AbstractMessageComposer.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageComposer.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliverException.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerConfigUtil.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java
labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java
labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourierFactory.java
Modified:
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/ESBAwareGenerator.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/GatewayGenerator.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/Generator.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractThreadedManagedLifecycle.java
labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/RegistryUtilUnitTest.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
labs/jbossesb/trunk/product/core/services/tests/src/org/jboss/internal/soa/esb/services/registry/MockRegistry.java
labs/jbossesb/trunk/product/tools/configeditor/editor/dist/jbossesb-config-editor.war
Log:
Merged JBossRemoting code from a workspace branch.
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/actions/ActionUtils.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -82,7 +82,21 @@
*/
public static Object setTaskObject(Message message, Object obj)
{
- Object oRet = message.getBody().remove(CURRENT_OBJECT);
+
+ //
+ // There really needs to be an optoin on the body to set it's contents
+ // as an unnamed Object, in the same way you can set it's contents as a byte[].
+ // I think this is a major usability issue. This utility should not be required.
+ //
+ // My suggestion would be...
+ // public void setContents(Object object);
+ // public Object getContents();
+ // public byte[] toByteArray();
+ //
+ // Not totally sure of the value of toByteArray??
+ // Continue to allow setting of named attachments.
+
+ Object oRet = message.getBody().remove(CURRENT_OBJECT);
if (null!=obj) {
message.getBody().add(CURRENT_OBJECT,obj);
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/ESBAwareGenerator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/ESBAwareGenerator.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/ESBAwareGenerator.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -48,7 +48,7 @@
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
-class ESBAwareGenerator {
+public class ESBAwareGenerator {
/**
* XMLBeans based configuration model instance.
@@ -60,7 +60,7 @@
* Public constructor.
* @param model XMLBeans based configuration model instance.
*/
- protected ESBAwareGenerator(XMLBeansModel model) {
+ public ESBAwareGenerator(XMLBeansModel model) {
this.model = model;
}
@@ -69,7 +69,7 @@
* @return The configuration DOM.
* @throws ConfigurationException Error creating configuration.
*/
- protected Document generate() throws ConfigurationException {
+ public Document generate() throws ConfigurationException {
Document doc = YADOMUtil.createDocument();
Element root;
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/GatewayGenerator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/GatewayGenerator.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/GatewayGenerator.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -45,7 +45,7 @@
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
-class GatewayGenerator {
+public class GatewayGenerator {
/**
* XMLBeans based configuration model instance.
@@ -57,7 +57,7 @@
* Public constructor.
* @param model XMLBeans based configuration model instance.
*/
- protected GatewayGenerator(XMLBeansModel model) {
+ public GatewayGenerator(XMLBeansModel model) {
this.model = model;
}
@@ -66,7 +66,7 @@
* @return The configuration DOM.
* @throws ConfigurationException Error creating configuration.
*/
- protected Document generate() throws ConfigurationException {
+ public Document generate() throws ConfigurationException {
Document doc = YADOMUtil.createDocument();
Element root;
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/Generator.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/Generator.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/config/Generator.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -87,19 +87,30 @@
}
// Parse the config into our internal model instance...
- try {
- JbossesbDocument doc = JbossesbDocument.Factory.parse(config);
-
- // TODO: Get validation working from here - would save
- // doc.validate();
-
- model = new XMLBeansModel(doc);
- } catch (XmlException e) {
- throw new ConfigurationException("Error while processing ESB Listener configuration stream.", e);
- }
- }
-
+ model = parseConfig(config);
+ }
+
/**
+ * Parse the supplied config into a config model.
+ * @param config The input configuration stream.
+ * @return Config model.
+ * @throws ConfigurationException Bad listener ESB configuration.
+ * @throws IOException Unable to read the ESB listener configuration.
+ */
+ public static XMLBeansModel parseConfig(InputStream config) throws IOException, ConfigurationException {
+ try {
+ JbossesbDocument doc = JbossesbDocument.Factory.parse(config);
+
+ // TODO: Get validation working from here - would save
+ // doc.validate();
+
+ return new XMLBeansModel(doc);
+ } catch (XmlException e) {
+ throw new ConfigurationException("Error while processing ESB Listener configuration stream.", e);
+ }
+ }
+
+ /**
* Generate the configuration set in the supplied output directory and store it in the member outputstreams.
* @throws ConfigurationException Failed to generate configuration set.
*/
@@ -169,17 +180,16 @@
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
- public class XMLBeansModel {
+ public static class XMLBeansModel {
/**
* XMLBeans config model instance.
*/
- @SuppressWarnings("unused")
private Jbossesb jbossesb;
/**
* Constructor.
- * @param jbossesb XMLBeans config model.
+ * @param xmlBeansDoc XMLBeans config model.
*/
private XMLBeansModel(JbossesbDocument xmlBeansDoc) {
this.jbossesb = xmlBeansDoc.getJbossesb();
@@ -266,6 +276,7 @@
/**
* Get the list of ESB Listeners based on their Gateway flag.
* @return The list of ESB Aware or Gateway Listeners from the configuration.
+ * @param isGateway Is the listener a gateway or ESB aware listener.
*/
private List<Listener> getListeners(boolean isGateway) {
List<Listener> gateways = new ArrayList<Listener>();
@@ -286,7 +297,7 @@
/**
* Gets the setting for the number of seconds between reloads.
*
- * @return
+ * @return The param reload seconds config value.
*/
public String getParameterReloadSecs() {
return jbossesb.getParameterReloadSecs().getStringValue();
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListener.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,407 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.gateway;
+
+import org.jboss.soa.esb.listeners.lifecycle.AbstractManagedLifecycle;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.message.UncomposedMessageDeliveryAdapter;
+import org.jboss.soa.esb.listeners.message.MessageComposer;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.listeners.message.AbstractMessageComposer;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.helpers.KeyValuePair;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.remoting.ServerInvocationHandler;
+import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.InvokerLocator;
+import org.jboss.remoting.InvocationRequest;
+import org.jboss.remoting.transport.Connector;
+import org.jboss.remoting.callback.InvokerCallbackHandler;
+import org.apache.log4j.Logger;
+
+import javax.management.MBeanServer;
+import java.util.*;
+import java.net.URI;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.net.URISyntaxException;
+
+/**
+ * JBoss Remoting listener implementation for receiving ESB unaware messages
+ * over a JBoss Remoting channel.
+ * <p/>
+ * This class implements the JBoss Remoting interface {@link org.jboss.remoting.ServerInvocationHandler}.
+ * Messages are pushed in through the {@link #invoke(org.jboss.remoting.InvocationRequest)} method,
+ * which is an implementation of the {@link org.jboss.remoting.ServerInvocationHandler#invoke(org.jboss.remoting.InvocationRequest)}
+ * method.
+ * <p/>
+ * The JBoss Remoting {@link org.jboss.remoting.transport.Connector}
+ * configuration is populated by the {@link #initaliseJBRConnectorConfiguration(java.util.Map)}
+ * method. The remoting server locator URI is constructed by the
+ * {@link #getJbrServerLocatorURI()} method.
+ * <p/>
+ * The default {@link org.jboss.soa.esb.listeners.message.MessageComposer} used by this listener is the
+ * {@link org.jboss.soa.esb.listeners.gateway.JBossRemotingGatewayListener.JBossRemotingMessageComposer}. All
+ * message composer implementation supplied to this class will be supplied an instance of
+ * {@link org.jboss.remoting.InvocationRequest}, from which they must compose an ESB aware
+ * {@link Message} instance.
+ * <p/>
+ * See the <a href="http://labs.jboss.com/portal/jbossremoting">JBoss Remoting</a> docs
+ * for details on configuring specific remoting server protocols.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JBossRemotingGatewayListener extends AbstractManagedLifecycle implements ServerInvocationHandler {
+
+ /**
+ * JBoss Remoting connector config attribute name prefix.
+ */
+ public static final String JBR_PREFIX = "jbr-";
+ /**
+ * Server Protocol config attribute name.
+ */
+ public static final String JBR_SERVER_PROTOCOL = JBR_PREFIX + "serverProtocol";
+ /**
+ * Server Host config attribute name.
+ */
+ public static final String JBR_SERVER_HOST = JBR_PREFIX + ServerInvoker.SERVER_BIND_ADDRESS_KEY;
+ /**
+ * Server port config attribute name.
+ */
+ public static final String JBR_SERVER_PORT = JBR_PREFIX + ServerInvoker.SERVER_BIND_PORT_KEY;
+
+ /**
+ * Class Logger instance.
+ */
+ private static Logger logger = Logger.getLogger(JBossRemotingGatewayListener.class);
+ /**
+ * JBoss Remoting connector.
+ */
+ private Connector connector;
+ /**
+ * Connector configuration.
+ */
+ private Map<String, String> connectorConfig = new HashMap<String, String>();
+ /**
+ * Server URI.
+ */
+ private String jbrServerLocatorURI;
+ /**
+ * Delivery adapter.
+ */
+ private UncomposedMessageDeliveryAdapter messageDeliveryAdapter;
+ /**
+ * Simple flag marking the listener instance as being initialised or un-initialised.
+ */
+ private boolean initialised;
+
+ /**
+ * Construct the threaded managed lifecycle.
+ *
+ * @param config The configuration associated with this instance.
+ * @throws org.jboss.soa.esb.ConfigurationException
+ * for configuration errors during initialisation.
+ */
+ protected JBossRemotingGatewayListener(ConfigTree config) throws ConfigurationException {
+ super(config);
+ }
+
+ /**
+ * Is this listener instance initialised.
+ *
+ * @return True if this listener is initialised, otherwise false.
+ */
+ public boolean isInitialised() {
+ return initialised;
+ }
+
+ /**
+ * Is this listener instance started.
+ * <p/>
+ * Basically, Is this listener's JBoss Remoting Connector connected?
+ *
+ * @return True if this listener is started, otherwise false.
+ */
+ public boolean isStarted() {
+ return (connector != null);
+ }
+
+ /*
+ * ***************************************************************************
+ *
+ * AbstractManagedLifecycle methods...
+ *
+ * ****************************************************************************
+ */
+
+ protected void doInitialise() throws ManagedLifecycleException {
+ if(isInitialised()) {
+ throw new ManagedLifecycleException("Unexpected request to initialise JBoss Remoting Gateway listener '" + getConfig().getName() + "'. Gateway already initialised.");
+ }
+
+ try {
+ initialiseESBDeliveryAdapter();
+ initaliseJBRConnectorConfiguration(connectorConfig);
+ } catch (ConfigurationException e) {
+ throw new ManagedLifecycleException("Remoting Listener configuration failed.", e);
+ }
+
+ initialised = true;
+ }
+
+ protected void doStart() throws ManagedLifecycleException {
+ if(!isInitialised()) {
+ throw new ManagedLifecycleException("Unexpected request to start JBoss Remoting Gateway listener '" + getConfig().getName() + "'. Gateway not initialised.");
+ }
+ if(isStarted()) {
+ throw new ManagedLifecycleException("Unexpected request to start JBoss Remoting Gateway listener '" + getConfig().getName() + "'. Gateway already started.");
+ }
+
+ try {
+ InvokerLocator locator = new InvokerLocator(jbrServerLocatorURI);
+
+ connector = new Connector(locator, connectorConfig);
+ connector.create();
+ connector.addInvocationHandler(getConfig().getAttribute("name", this.toString()), this);
+ connector.start();
+
+ logger.info("JBoss Remoting Gateway listener '" + getConfig().getName() + "' started.");
+ } catch (Throwable throwable) {
+ connector = null;
+ throw new ManagedLifecycleException("Unable to start Remoting Listener instsance " + getClass().getName(), throwable);
+ }
+ }
+
+ protected void doStop() throws ManagedLifecycleException {
+ if(!isStarted()) {
+ throw new ManagedLifecycleException("Unexpected request to stop JBoss Remoting Gateway listener '" + getConfig().getName() + "'. Gateway not running.");
+ }
+
+ try {
+ connector.stop();
+ logger.info("JBoss Remoting Gateway listener '" + getConfig().getName() + "' stopped.");
+ } catch (Throwable throwable) {
+ throw new ManagedLifecycleException("Unable to stop Remoting Listener instsance " + getClass().getName(), throwable);
+ } finally {
+ connector = null;
+ }
+ }
+
+ protected void doDestroy() throws ManagedLifecycleException {
+ }
+
+ /*
+ * ***************************************************************************
+ *
+ * JBoss Remoting ServerInvocationHandler methods...
+ *
+ * ****************************************************************************
+ */
+
+ /**
+ * Process a remoting invocation message.
+ * <p/>
+ * This method uses an {@link org.jboss.soa.esb.listeners.message.UncomposedMessageDeliveryAdapter}
+ * to carry out the delivery. This delivery adpter is constructed with a
+ * {@link org.jboss.soa.esb.listeners.message.MessageComposer} instance supplied through
+ * configuration, otherwise it uses the
+ * {@link org.jboss.soa.esb.listeners.gateway.JBossRemotingGatewayListener.JBossRemotingMessageComposer}.
+ * <p/>
+ * The message composer is responsible for mapping the remoting {@link org.jboss.remoting.InvocationRequest}
+ * into an ESB aware {@link org.jboss.soa.esb.message.Message}, while the
+ * {@link org.jboss.soa.esb.listeners.message.UncomposedMessageDeliveryAdapter} is responsible for its
+ * delivery to the target service.
+ *
+ * @param invocationRequest JBoss Remoting request.
+ * @return Message delivery acknowledgment response.
+ * @throws Throwable Message processing failure.
+ */
+ public Object invoke(InvocationRequest invocationRequest) throws Throwable {
+ try {
+ messageDeliveryAdapter.deliver(invocationRequest);
+ } catch (Throwable t) {
+ logger.error("JBoss Remoting Gateway failed to deliver message to target service [" +
+ messageDeliveryAdapter.getServiceCategory() + ":" +
+ messageDeliveryAdapter.getServiceName() + "].", t);
+
+ throw t;
+ }
+
+ // TODO: We need a proper/configurable acknowledgment method!!
+ return "<ack/>";
+ }
+
+ public void setMBeanServer(MBeanServer mBeanServer) {
+ }
+
+ public void setInvoker(ServerInvoker serverInvoker) {
+ }
+
+ public void addListener(InvokerCallbackHandler invokerCallbackHandler) {
+ }
+
+ public void removeListener(InvokerCallbackHandler invokerCallbackHandler) {
+ }
+
+ /**
+ * Initialise the JBossRemoting connector configuration.
+ * <p/>
+ * Constructs the JBR {@link org.jboss.remoting.InvokerLocator} URI
+ * through a call to {@link #getJbrServerLocatorURI()}. Also
+ * populates the server connector properties.
+ * <p/>
+ * Default behavior for population of the connector configuration is to load
+ * all listener configuration properties whose name is prefixed with "jbr-",
+ * stripping off the "jbr-" prefix from the name before adding.
+ * So, to set the Server "timeout" configuration property
+ * on the connector, you set the property name to "jbr-timeout".
+ *
+ * @param connectorConfig The configuration map instance to be populated.
+ * @throws ConfigurationException Problem populating the configuration.
+ */
+ protected void initaliseJBRConnectorConfiguration(Map<String, String> connectorConfig) throws ConfigurationException {
+ // Initialse the JBR connector URI...
+ jbrServerLocatorURI = getJbrServerLocatorURI().toString();
+
+ // Populate the connector config...
+ List<KeyValuePair> attributes = getConfig().attributesAsList();
+ for (KeyValuePair attribute : attributes) {
+ String attributeName = attribute.getKey();
+
+ if (attributeName.startsWith(JBR_PREFIX)) {
+ connectorConfig.put(attributeName.substring(JBR_PREFIX.length()), attribute.getValue());
+ }
+ }
+ }
+
+ /**
+ * Get the Service Locator URI for this remotng based listener.
+ * <p/>
+ * Uses the listener config to extract the {@link #JBR_SERVER_PROTOCOL protcol},
+ * {@link #JBR_SERVER_HOST host} and {@link #JBR_SERVER_PORT port}
+ * parameters for the server. The host address defaults to
+ * the value returned by {@link java.net.InetAddress#getLocalHost()}.
+ *
+ * @return The Server Locator URI.
+ * @throws ConfigurationException One or more of the locator properties
+ * are missing from the listener config.
+ */
+ protected URI getJbrServerLocatorURI() throws ConfigurationException {
+ String protocol = getConfig().getAttribute(JBR_SERVER_PROTOCOL);
+ String host = getConfig().getAttribute(JBR_SERVER_HOST);
+ String port = getConfig().getAttribute(JBR_SERVER_PORT);
+
+ if (protocol == null || protocol.trim().equals("")) {
+ throw new ConfigurationException("Invalid JBoss Remoting Gateway configuration [" + getConfig().getName() + "]. 'protocol' configuration attribute not specified.");
+ }
+ if (host == null || host.trim().equals("")) {
+ try {
+ host = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ throw new ConfigurationException("Invalid JBoss Remoting Gateway configuration [" + getConfig().getName() + "]. 'host' configuration attribute not specified and unablee to determine local hostname.", e);
+ }
+ }
+ if (port == null || port.trim().equals("")) {
+ throw new ConfigurationException("Invalid JBoss Remoting Gateway configuration [" + getConfig().getName() + "]. 'port' configuration attribute not specified.");
+ }
+
+ String uriString = protocol + "://" + host + ":" + port;
+ try {
+ return new URI(uriString);
+ } catch (URISyntaxException e) {
+ throw new ConfigurationException("Invalid JBoss Remoting Gateway configuration [" + getConfig().getName() + "]. Invalid server locator URI '" + uriString + "'.");
+ }
+ }
+
+
+ /**
+ * Initialise the message delivery adpter for the target service.
+ *
+ * @throws ConfigurationException Configuration error.
+ */
+ private void initialiseESBDeliveryAdapter() throws ConfigurationException {
+ String targetServiceCategory = getConfig().getAttribute(ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG);
+ String targetServiceName = getConfig().getAttribute(ListenerTagNames.TARGET_SERVICE_NAME_TAG);
+ String composerClass = getConfig().getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+
+ if (targetServiceCategory == null || targetServiceCategory.trim().equals("")) {
+ throw new ConfigurationException("Invalid gateway configuration. '" + ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG + "' not specified.");
+ }
+ if (targetServiceName == null || targetServiceName.trim().equals("")) {
+ throw new ConfigurationException("Invalid gateway configuration. '" + ListenerTagNames.TARGET_SERVICE_NAME_TAG + "' not specified.");
+ }
+
+ try {
+ MessageComposer composer;
+
+ if (composerClass != null) {
+ composer = MessageComposer.Factory.getInstance(composerClass, getConfig());
+ } else {
+ composer = new JBossRemotingMessageComposer();
+ }
+ messageDeliveryAdapter = new UncomposedMessageDeliveryAdapter(targetServiceCategory, targetServiceName, composer);
+ } catch (RegistryException e) {
+ throw new ConfigurationException("Remoting Listener configuration failed.", e);
+ } catch (MessageDeliverException e) {
+ throw new ConfigurationException("Remoting Listener configuration failed.", e);
+ }
+ }
+
+ /**
+ * Message composer for a JBoss Remoting {@link org.jboss.remoting.InvocationRequest}
+ * instance.
+ */
+ public static class JBossRemotingMessageComposer extends AbstractMessageComposer {
+
+ public void setConfiguration(ConfigTree config) {
+ }
+
+ protected void populateMessage(Message message, Object invocationRequestObj) throws MessageDeliverException {
+ InvocationRequest invocationRequest = (InvocationRequest) invocationRequestObj;
+
+ //
+ // There really needs to be an optoin on the body to set it's contents
+ // as an unnamed Object, in the same way you can set it's contents as a byte[].
+ // I think this is a major'ish usability issue.
+ //
+ // My suggestion would be...
+ // public void setContents(Object object);
+ // public Object getContents();
+ // public byte[] toByteArray(); // (was "byte[] getContents()")
+ //
+ // Not totally sure of the real value of toByteArray??
+ // Continue to allow setting of named attachments.
+ ActionUtils.setTaskObject(message, invocationRequest.getParameter());
+
+ // Copy the request properties onto the message...
+ Map properties = invocationRequest.getRequestPayload();
+ if(properties != null) {
+ Set<Map.Entry> propertyEntrySet = properties.entrySet();
+ for (Map.Entry entry : propertyEntrySet) {
+ message.getProperties().setProperty(entry.toString(), entry.getValue());
+ }
+ }
+ }
+ }
+}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -22,16 +22,34 @@
package org.jboss.soa.esb.listeners.gateway;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Enumeration;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.NamingException;
+
import org.apache.log4j.Logger;
-import org.jboss.internal.soa.esb.rosetta.pooling.ConnectionException;
-import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
-import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
-import org.jboss.soa.esb.couriers.CourierCollection;
+import org.jboss.soa.esb.couriers.Courier;
import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.NamingContext;
import org.jboss.soa.esb.listeners.ListenerTagNames;
@@ -43,481 +61,464 @@
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.util.ClassUtil;
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueSession;
-import javax.jms.TextMessage;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Enumeration;
-
public class JmsGatewayListener extends AbstractThreadedManagedLifecycle
{
- /**
- * serial version uid for this class
- */
- private static final long serialVersionUID = 5070422864110923930L;
+ /**
+ * serial version uid for this class
+ */
+ private static final long serialVersionUID = 5070422864110923930L;
+
+ public JmsGatewayListener (ConfigTree listenerConfig) throws ConfigurationException
+ {
+ super(listenerConfig) ;
+ _config = listenerConfig;
+ _sleepForRetries = 3000; // milliseconds TODO magic number
+ checkMyParms();
+ } // __________________________________
- public JmsGatewayListener(ConfigTree listenerConfig) throws ConfigurationException
- {
- super(listenerConfig);
- _config = listenerConfig;
- _sleepForRetries = 3000; // milliseconds TODO magic number
- checkMyParms();
- } // __________________________________
+
+ /**
+ * Handle the initialisation of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while initialisation.
+ */
+ protected void doInitialise()
+ throws ManagedLifecycleException
+ {
+ try
+ {
+ _targetEprs = RegistryUtil.getEprs(_targetServiceCategory,_targetServiceName);
+ if (null == _targetEprs || _targetEprs.size() < 1)
+ throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry") ;
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException("Unexpected registry exception", re) ;
+ }
+
+ if (_serviceName != null)
+ {
+ try
+ {
+ RegistryUtil.register(_config, _myEpr);
+ }
+ catch (final RegistryException re)
+ {
+ throw new ManagedLifecycleException("Unexpected error during registration for epr " + _myEpr, re);
+ }
+ }
+
+ try
+ {
+ prepareMessageReceiver();
+ }
+ catch (final JMSException jmse)
+ {
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
+ }
+ throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", jmse);
+ }
+ catch (final ConfigurationException ce)
+ {
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
+ }
+ throw new ManagedLifecycleException("Unexpected configuration exception from prepareMessageReceiver", ce);
+ }
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected void doRun()
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("run() method of " + this.getClass().getSimpleName() +
+ " started on thread " + Thread.currentThread().getName());
+ }
+ while (isRunning())
+ {
+ javax.jms.Message msgIn = receiveOne();
+ if (null != msgIn) {
+ try {
+ Object obj = _processMethod.invoke(_composer,
+ new Object[]{msgIn});
+ if (null == obj) {
+ _logger.warn("Action class method <" + _processMethod
+ .getName() + "> returned a null object");
+ continue;
+ }
+ // try to deliver the composed message, using the
+ // appropriate courier
+ // to the target service
+ try {
+ boolean bSent = false;
+ for (EPR current : _targetEprs) {
+ _courier = CourierFactory.getCourier(current);
+ try {
+ if (_courier
+ .deliver((Message) obj)) {
+ bSent = true;
+ break;
+ }
+ }
+ finally {
+ CourierUtil.cleanCourier(_courier);
+ }
+ }
+ if (!bSent) {
+ String text = "Target service <" + _targetServiceCategory + "," + _targetServiceName + "> is not registered";
+ throw new Exception(text);
+ }
+ }
+ catch (ClassCastException e) {
+ _logger.error("Action class method <" + _processMethod
+ .getName() + "> returned a non Message object",
+ e);
+ continue;
+ }
+ catch (CourierException e) {
+ String text = (null != _courier) ? "Courier <" + _courier
+ .getClass().getName() + ".deliver(Message) FAILED" : "NULL courier can't deliver Message";
+ _logger.error(text, e);
+ continue;
+ }
+ continue;
+ }
+ catch (InvocationTargetException e) {
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
+ }
+ catch (IllegalAccessException e) {
+ _logger.error("Problems invoking method <" + _processMethod
+ .getName() + ">", e);
+ }
+ catch (Exception e) {
+ _logger.error("Unexpected problem", e);
+ }
+ }
+ }
- /**
- * Handle the initialisation of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while initialisation.
- */
- protected void doInitialise()
- throws ManagedLifecycleException
- {
- final Collection<EPR> targetEprs;
- try
- {
- targetEprs = RegistryUtil.getEprs(_targetServiceCategory, _targetServiceName);
- if (null == targetEprs || targetEprs.size() < 1)
- throw new ManagedLifecycleException("EPR <" + _targetServiceName + "> not found in registry");
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected registry exception", re);
- }
-
- try
- {
- couriers = new CourierCollection(targetEprs) ;
- }
- catch (final CourierException ce)
- {
- throw new ManagedLifecycleException("Unexpected courier exception", ce);
- }
- catch (final MalformedEPRException mepre)
- {
- throw new ManagedLifecycleException("Invalid EPR specified", mepre);
- }
+ _logger
+ .debug("run() method of " + this.getClass().getSimpleName() + " finished on thread " + Thread
+ .currentThread().getName());
+ } // ________________________________
- if (_serviceName != null)
- {
- try
- {
- RegistryUtil.register(_config, _myEpr);
- }
- catch (final RegistryException re)
- {
- throw new ManagedLifecycleException("Unexpected error during registration for epr " + _myEpr, re);
- }
- }
-
- try
- {
- prepareMessageReceiver();
- }
- catch (final ConnectionException ce)
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
- throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", ce);
- }
- catch (final JMSException jmse)
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
- throw new ManagedLifecycleException("Unexpected JMS error from prepareMessageReceiver", jmse);
- }
- catch (final ConfigurationException ce)
- {
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
- throw new ManagedLifecycleException("Unexpected configuration exception from prepareMessageReceiver", ce);
- }
- }
-
- /**
- * Execute on the thread.
- */
- protected void doRun()
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("run() method of " + this.getClass().getSimpleName() +
- " started on thread " + Thread.currentThread().getName());
- }
-
- while (isRunning())
- {
- javax.jms.Message msgIn = receiveOne();
- if (null != msgIn)
- try
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy()
+ throws ManagedLifecycleException
+ {
+ if (_messageReceiver != null)
{
- final Object obj ;
- try
- {
- obj = _processMethod.invoke(_composer,
- new Object[]{msgIn});
- }
- catch (InvocationTargetException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- continue ;
- }
- catch (IllegalAccessException e)
- {
- _logger.error("Problems invoking method <" + _processMethod
- .getName() + ">", e);
- continue ;
- }
-
- if (null == obj)
- {
- _logger.warn("Action class method <" + _processMethod
- .getName() + "> returned a null object");
- continue;
- }
- if (!(obj instanceof Message))
- {
- _logger.error("Action class method <" + _processMethod
- .getName() + "> returned a non Message object" +
- obj.getClass().getName()) ;
- continue;
- }
-
- couriers.deliver((Message)obj) ;
+ try
+ {
+ _messageReceiver.close();
+ }
+ catch (final JMSException jmse) {} // ignore
}
- catch (Exception e)
+
+ if (_queueSession != null)
{
- _logger.error("Unexpected problem", e);
+ try
+ {
+ _queueSession.close();
+ }
+ catch (final JMSException jmse) {} // ignore
}
- }
+
+ if (_queueConnection != null)
+ {
+ try
+ {
+ _queueConnection.close();
+ }
+ catch (final JMSException jmse) {} // ignore
+ }
+
+ if (_serviceName != null)
+ {
+ RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr) ;
+ }
+
+ }
- _logger
- .debug("run() method of " + this.getClass().getSimpleName() + " finished on thread " + Thread
- .currentThread().getName());
- } // ________________________________
+ /**
+ * Check for mandatory and optional attributes in parameter tree
+ *
+ * @throws ConfigurationException -
+ * if mandatory atts are not right or actionClass not in
+ * classpath
+ */
+ protected void checkMyParms () throws ConfigurationException
+ {
+ // Third arg is null - Exception will be thrown if attribute is not
+ // found
+ _targetServiceCategory = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
+ _targetServiceName = ListenerUtil.obtainAtt(_config,
+ ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- if (_messageReceiver != null)
- {
- try
- {
- _messageReceiver.close();
- }
- catch (final JMSException jmse)
- {
- } // ignore
- }
+ _queueName = ListenerUtil.obtainAtt(_config,
+ JMSEpr.DESTINATION_NAME_TAG, null);
- if (_queueSession != null)
- {
- _pool.closeSession(_queueSession);
- }
+ resolveComposerClass();
- if (_serviceName != null)
- {
- RegistryUtil.unregister(_serviceCategory, _serviceName, _myEpr);
- }
+ // No problem if selector is null - everything in queue will be returned
+ _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
+ _logger
+ .debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG + " - All messages in queue will be received by this listener");
+ } // ________________________________
- }
+ protected void resolveComposerClass () throws ConfigurationException
+ {
+ try
+ {
+ String sProcessMethod = null;
+ _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+ if (null != _composerName)
+ { // class attribute
+ _composerClass = Class.forName(_composerName);
+ Constructor oConst = _composerClass.getConstructor(new Class[]
+ { ConfigTree.class });
+ _composer = oConst.newInstance(_config);
+ sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
+ }
+ else
+ {
+ _composerName = PackageJmsMessageContents.class.getName();
+ _composerClass = PackageJmsMessageContents.class;
+ _composer = new PackageJmsMessageContents();
+ sProcessMethod = "process";
+ _logger
+ .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
+ }
+
+ _processMethod = _composerClass.getMethod(sProcessMethod,
+ new Class[] { Object.class });
+ }
+ catch (Exception ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+ } // ________________________________
- /**
- * Check for mandatory and optional attributes in parameter tree
- *
- * @throws ConfigurationException -
- * if mandatory atts are not right or actionClass not in
- * classpath
- */
- protected void checkMyParms() throws ConfigurationException
- {
- // Third arg is null - Exception will be thrown if attribute is not
- // found
- _targetServiceCategory = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
- _targetServiceName = ListenerUtil.obtainAtt(_config,
- ListenerTagNames.TARGET_SERVICE_NAME_TAG, null);
+ private void prepareMessageReceiver () throws ConfigurationException, JMSException
+ {
+ _queueConnection = null;
+ _queueSession = null;
+ _queue = null;
- _queueName = ListenerUtil.obtainAtt(_config,
- JMSEpr.DESTINATION_NAME_TAG, null);
+ String sJndiURL = ListenerUtil.obtainAtt(_config, JMSEpr.JNDI_URL_TAG,
+ NamingContext.JBOSS_PROVIDER_URL);
+ if (null == _config.getAttribute(JMSEpr.JNDI_URL_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_URL_TAG + " attribute" + " - Using default of: '" + sJndiURL + "'");
+ String sJndiContextFactory = ListenerUtil.obtainAtt(_config,
+ JMSEpr.JNDI_CONTEXT_FACTORY_TAG,
+ NamingContext.JBOSS_INITIAL_CONTEXT_FACTORY);
+ if (null == _config.getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_CONTEXT_FACTORY_TAG + " attribute" + " - Using default of: '" + sJndiContextFactory + "'");
+ String sJndiPkgPrefix = ListenerUtil.obtainAtt(_config,
+ JMSEpr.JNDI_PKG_PREFIX_TAG, NamingContext.JBOSS_PROVIDER_URL);
+ if (null == _config.getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.JNDI_PKG_PREFIX_TAG + " attribute" + " - Using default of: '" + sJndiPkgPrefix + "'");
+ Context oJndiCtx = NamingContext.getServerContext(sJndiURL,
+ sJndiContextFactory, sJndiPkgPrefix);
+ if (null == oJndiCtx)
+ throw new ConfigurationException(
+ "Unable fo obtain jndi context <" + sJndiURL + "," + sJndiContextFactory + "," + sJndiPkgPrefix + ">");
- resolveComposerClass();
+ String sFactClass = ListenerUtil.obtainAtt(_config,
+ JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
+ if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
+ _logger
+ .debug("No value specified for " + JMSEpr.CONNECTION_FACTORY_TAG + " attribute" + " - Using default of: '" + sFactClass + "'");
+ _serviceCategory = _config
+ .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
+ _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
+ _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
+ _queueName, sFactClass, sJndiURL, sJndiContextFactory,
+ sJndiPkgPrefix, _messageSelector);
+
+ Object tmp = null;
+
+ try
+ {
+ tmp = oJndiCtx.lookup(sFactClass);
+ }
+ catch (NamingException ex)
+ {
+ throw new ConfigurationException(ex);
+ }
+
+ QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;
- // No problem if selector is null - everything in queue will be returned
- _messageSelector = _config.getAttribute(JMSEpr.MESSAGE_SELECTOR_TAG);
- _logger
- .debug("No value specified for: " + JMSEpr.MESSAGE_SELECTOR_TAG + " - All messages in queue will be received by this listener");
- } // ________________________________
+ _queueConnection = qcf.createQueueConnection();
+ _queueSession = _queueConnection.createQueueSession(false,
+ QueueSession.AUTO_ACKNOWLEDGE);
+ try
+ {
+ _queue = (Queue) oJndiCtx.lookup(_queueName);
+ }
+ catch (NamingException ne)
+ {
+ _queue = _queueSession.createQueue(_queueName);
+ }
+ _queueConnection.start();
- protected void resolveComposerClass() throws ConfigurationException
- {
- try
- {
- String sProcessMethod = null;
- _composerName = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
- if (null != _composerName)
- { // class attribute
- _composerClass = ClassUtil.forName(_composerName, getClass());
- Constructor oConst = _composerClass.getConstructor(new Class[]
- {ConfigTree.class});
- _composer = oConst.newInstance(_config);
- sProcessMethod = _config.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_METHOD_TAG, "process");
+ _messageReceiver = _queueSession.createReceiver(_queue,
+ _messageSelector);
+ if (null != oJndiCtx) try
+ {
+ oJndiCtx.close();
}
- else
+ catch (NamingException ne)
{
- _composerName = PackageJmsMessageContents.class.getName();
- _composerClass = PackageJmsMessageContents.class;
- _composer = new PackageJmsMessageContents();
- sProcessMethod = "process";
- _logger
- .debug("No <" + ListenerTagNames.ACTION_ELEMENT_TAG + "> element found in configuration" + " - Using default composer class : " + _composerName);
+ _logger.error(ne.getMessage(), ne);
}
- _processMethod = _composerClass.getMethod(sProcessMethod,
- new Class[]{Object.class});
- }
- catch (Exception ex)
- {
- throw new ConfigurationException(ex);
- }
- } // ________________________________
+ } // ________________________________
- private void prepareMessageReceiver() throws ConfigurationException, JMSException, ConnectionException
- {
- _queueSession = null;
- _queue = null;
+ /**
+ * Receive one message and retry if connection
+ *
+ * @return javax.jms.Message - One input message, or null
+ */
+ protected javax.jms.Message receiveOne ()
+ {
+ while (isRunning())
+ try
+ {
+ javax.jms.Message ret = _messageReceiver.receive(200);
+ if (null != ret) return ret;
+ }
+ catch (JMSException oJ)
+ {
+ _logger.error("JMS error on receive. Attempting JMS Destination reconnect.",oJ);
+ try
+ {
+ prepareMessageReceiver();
+ }
+ // try to reconnect to the queue
+ catch (Exception e)
+ {
+ _logger.error("Reconnecting to Queue", e);
+ waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries) ;
+ }
+ }
+ return null;
+ } // ________________________________
- String sJndiURL = ListenerUtil.obtainAtt(_config, JMSEpr.JNDI_URL_TAG, "");
- if (null == _config.getAttribute(JMSEpr.JNDI_URL_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_URL_TAG + " attribute" + " - Using default of: '" + sJndiURL + "'");
- String sJndiContextFactory = ListenerUtil.obtainAtt(_config,
- JMSEpr.JNDI_CONTEXT_FACTORY_TAG, "");
- if (null == _config.getAttribute(JMSEpr.JNDI_CONTEXT_FACTORY_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_CONTEXT_FACTORY_TAG + " attribute" + " - Using default of: '" + sJndiContextFactory + "'");
- String sJndiPkgPrefix = ListenerUtil.obtainAtt(_config,
- JMSEpr.JNDI_PKG_PREFIX_TAG, "");
- if (null == _config.getAttribute(JMSEpr.JNDI_PKG_PREFIX_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.JNDI_PKG_PREFIX_TAG + " attribute" + " - Using default of: '" + sJndiPkgPrefix + "'");
- Context oJndiCtx = NamingContext.getServerContext(sJndiURL,
- sJndiContextFactory, sJndiPkgPrefix);
- if (null == oJndiCtx)
- throw new ConfigurationException(
- "Unable fo obtain jndi context <" + sJndiURL + "," + sJndiContextFactory + "," + sJndiPkgPrefix + ">");
+ /**
+ * Default gateway action for plain jms messages <p/>It will just drop the
+ * jms message contents into a esb Message
+ *
+ * @author <a
+ * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
+ * @since Version 4.0
+ *
+ */
+ private static class PackageJmsMessageContents
+ {
+ @SuppressWarnings("unchecked")
+ public Message process (Object obj) throws JMSException, IOException
+ {
+ if (!(obj instanceof javax.jms.Message))
+ throw new IllegalArgumentException(
+ "Object must be instance of javax.jms.Message");
+ byte[] bytes = getMessageContent((javax.jms.Message) obj);
+ if (null == bytes) return null;
- String sFactClass = ListenerUtil.obtainAtt(_config,
- JMSEpr.CONNECTION_FACTORY_TAG, "ConnectionFactory");
- if (null == _config.getAttribute(JMSEpr.CONNECTION_FACTORY_TAG))
- _logger
- .debug("No value specified for " + JMSEpr.CONNECTION_FACTORY_TAG + " attribute" + " - Using default of: '" + sFactClass + "'");
- _serviceCategory = _config
- .getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
- _serviceName = _config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _myEpr = (null == _serviceName) ? null : new JMSEpr(JMSEpr.QUEUE_TYPE,
- _queueName, sFactClass, sJndiURL, sJndiContextFactory,
- sJndiPkgPrefix, _messageSelector);
+ javax.jms.Message jmsMsg = (javax.jms.Message) obj;
+ Message message = MessageFactory.getInstance().getMessage();
+ message.getBody().setContents(getMessageContent(jmsMsg));
+ Enumeration<String> EE = jmsMsg.getPropertyNames();
+ if (null != EE)
+ {
+ while (EE.hasMoreElements())
+ {
+ String name = EE.nextElement();
+ Object value = jmsMsg.getObjectProperty(name);
+ if (null != value)
+ message.getProperties().setProperty(name, value);
+ }
+ }
+ return message;
+ }
- _pool = JmsConnectionPoolContainer.getPool(sJndiURL, sJndiContextFactory, sJndiPkgPrefix, sFactClass, JMSEpr.QUEUE_TYPE);
-
- try
- {
- _queueSession = _pool.getQueueSession();
- }
- catch (final NamingException ne)
- {
- throw new ConfigurationException("Naming exception while obtaining Queue session", ne) ;
- }
-
- try
- {
- _queue = (Queue) oJndiCtx.lookup(_queueName);
- }
- catch (NamingException ne)
- {
- _queue = _queueSession.createQueue(_queueName);
- }
+ private byte[] getMessageContent (javax.jms.Message jMess) throws JMSException, IOException
+ {
+ if (jMess instanceof TextMessage)
+ return ((TextMessage) jMess).getText().getBytes();
- _messageReceiver = _queueSession.createReceiver(_queue,
- _messageSelector);
- if (null != oJndiCtx) try
- {
- oJndiCtx.close();
- }
- catch (NamingException ne)
- {
- _logger.error(ne.getMessage(), ne);
- }
+ if (jMess instanceof BytesMessage)
+ {
+ BytesMessage jBytes = (BytesMessage) jMess;
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] ba = new byte[1000];
+ int iQread;
+ while (-1 != (iQread = jBytes.readBytes(ba)))
+ if (iQread > 0) out.write(ba, 0, iQread);
+ out.close();
+ return out.toByteArray();
+ }
- } // ________________________________
+ if (jMess instanceof ObjectMessage)
+ return ((ObjectMessage) jMess).getObject().toString()
+ .getBytes();
+ _logger
+ .warn("Message type " + jMess.getClass().getSimpleName() + " not supported - Message is ignored");
+ return null;
+ }
+ } // ____________________________________________________
- /**
- * Receive one message and retry if connection
- *
- * @return javax.jms.Message - One input message, or null
- */
- protected javax.jms.Message receiveOne()
- {
- javax.jms.Message ret = null ;
- try
- {
- if (_messageReceiver != null)
- {
- ret = _messageReceiver.receive(200);
- }
- }
- catch (JMSException oJ)
- {
- _logger.error("JMS error on receive. Attempting JMS Destination reconnect.", oJ);
- try
- {
- _messageReceiver.close() ;
- _pool.releaseSession(_queueSession) ;
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- _logger.error("Error cleaning up", e);
- }
-
- try
- {
- prepareMessageReceiver();
- }
- // try to reconnect to the queue
- catch (Exception e)
- {
- _logger.error("Reconnecting to Queue", e);
- waitForRunningStateChange(ManagedLifecycleThreadState.STOPPING, _sleepForRetries);
- }
- }
- return ret ;
- } // ________________________________
+ protected final static Logger _logger = Logger
+ .getLogger(JmsGatewayListener.class);
- /**
- * Default gateway action for plain jms messages <p/>It will just drop the
- * jms message contents into a esb Message
- *
- * @author <a
- * href="mailto:schifest at heuristica.com.ar">schifest at heuristica.com.ar</a>
- * @since Version 4.0
- */
- private static class PackageJmsMessageContents
- {
- @SuppressWarnings("unchecked")
- public Message process(Object obj) throws JMSException, IOException
- {
- if (!(obj instanceof javax.jms.Message))
- throw new IllegalArgumentException(
- "Object must be instance of javax.jms.Message");
- byte[] bytes = getMessageContent((javax.jms.Message) obj);
- if (null == bytes) return null;
+ protected String _queueName;
- javax.jms.Message jmsMsg = (javax.jms.Message) obj;
- Message message = MessageFactory.getInstance().getMessage();
- message.getBody().setContents(getMessageContent(jmsMsg));
- if (jmsMsg.getJMSMessageID()!=null) {
- message.getBody().add("MessageId", jmsMsg.getJMSMessageID());
- }
- Enumeration<String> EE = jmsMsg.getPropertyNames();
- if (null != EE)
- {
- while (EE.hasMoreElements())
- {
- String name = EE.nextElement();
- Object value = jmsMsg.getObjectProperty(name);
- if (null != value)
- message.getProperties().setProperty(name, value);
- }
- }
- return message;
- }
+ protected QueueConnection _queueConnection;
- private byte[] getMessageContent(javax.jms.Message jMess) throws JMSException, IOException
- {
- if (jMess instanceof TextMessage)
- return ((TextMessage) jMess).getText().getBytes();
+ protected QueueSession _queueSession;
- if (jMess instanceof BytesMessage)
- {
- BytesMessage jBytes = (BytesMessage) jMess;
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] ba = new byte[1000];
- int iQread;
- while (-1 != (iQread = jBytes.readBytes(ba)))
- if (iQread > 0) out.write(ba, 0, iQread);
- out.close();
- return out.toByteArray();
- }
+ protected Queue _queue;
- if (jMess instanceof ObjectMessage)
- return ((ObjectMessage) jMess).getObject().toString()
- .getBytes();
- _logger
- .warn("Message type " + jMess.getClass().getSimpleName() + " not supported - Message is ignored");
- return null;
- }
- } // ____________________________________________________
+ protected MessageConsumer _messageReceiver;
- protected final static Logger _logger = Logger
- .getLogger(JmsGatewayListener.class);
+ protected String _messageSelector;
- protected String _queueName;
+ protected ConfigTree _config;
- protected QueueSession _queueSession;
+ protected final long _sleepForRetries; // milliseconds
- protected Queue _queue;
+ protected String _serviceCategory, _serviceName;
- protected MessageConsumer _messageReceiver;
+ protected String _targetServiceCategory, _targetServiceName;
- protected String _messageSelector;
+ protected EPR _myEpr;
- protected ConfigTree _config;
+ protected Collection<EPR> _targetEprs;
- protected final long _sleepForRetries; // milliseconds
+ protected String _composerName;
- protected String _serviceCategory, _serviceName;
+ protected Class _composerClass;
- protected String _targetServiceCategory, _targetServiceName;
+ protected Object _composer;
- protected EPR _myEpr;
+ protected Method _processMethod;
- protected CourierCollection couriers ;
-
- protected String _composerName;
-
- protected Class _composerClass;
-
- protected Object _composer;
-
- protected Method _processMethod;
-
- protected JmsConnectionPool _pool;
+ protected Courier _courier;
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractManagedLifecycle.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -69,11 +69,14 @@
* The maximum amount of time to wait for termination.
*/
private long terminationPeriod = 60000 ;
-
/**
* The list of listeners associated with this managed instance.
*/
private Set<ManagedLifecycleEventListener> listeners = new CopyOnWriteArraySet<ManagedLifecycleEventListener>() ;
+ /**
+ * Instance configuration. Supplied through constructor.
+ */
+ private ConfigTree config;
/**
* Construct the managed lifecycle.
@@ -101,6 +104,8 @@
{
logger.debug(PARAM_TERMINATION_PERIOD + " value " + terminationPeriod) ;
}
+
+ this.config = config;
}
/**
@@ -451,4 +456,8 @@
in.defaultReadObject() ;
state = ManagedLifecycleState.CONSTRUCTED ;
}
+
+ public ConfigTree getConfig() {
+ return config;
+ }
}
Modified: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractThreadedManagedLifecycle.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractThreadedManagedLifecycle.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/lifecycle/AbstractThreadedManagedLifecycle.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -21,10 +21,6 @@
*/
package org.jboss.soa.esb.listeners.lifecycle;
-import org.apache.log4j.Logger;
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.helpers.ConfigTree;
-
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Set;
@@ -34,338 +30,328 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+
/**
* This class provides threaded support for a managed instance.
- *
+ *
* @author kevin
*/
public abstract class AbstractThreadedManagedLifecycle extends AbstractManagedLifecycle implements Runnable
{
- /**
- * The logger for this class.
- */
- private static final Logger logger = Logger.getLogger(AbstractThreadedManagedLifecycle.class);
+ /**
+ * The logger for this class.
+ */
+ private static final Logger logger = Logger.getLogger(AbstractThreadedManagedLifecycle.class) ;
+
+ /**
+ * The lock used for managing the running state.
+ */
+ private final Lock runningLock = new ReentrantLock() ;
+ /**
+ * The condition used for running state changes.
+ */
+ private final Condition runningChanged = runningLock.newCondition() ;
+
+ /**
+ * The running state.
+ */
+ private transient ManagedLifecycleThreadState state = ManagedLifecycleThreadState.STOPPED ;
+
+ /**
+ * The list of listeners associated with this managed instance.
+ */
+ private Set<ManagedLifecycleThreadEventListener> listeners = new CopyOnWriteArraySet<ManagedLifecycleThreadEventListener>() ;
+
- /**
- * The lock used for managing the running state.
- */
- private final Lock runningLock = new ReentrantLock();
- /**
- * The condition used for running state changes.
- */
- private final Condition runningChanged = runningLock.newCondition();
-
- /**
- * The running state.
- */
- private transient ManagedLifecycleThreadState state = ManagedLifecycleThreadState.STOPPED;
-
- /**
- * The list of listeners associated with this managed instance.
- */
- private Set<ManagedLifecycleThreadEventListener> listeners = new CopyOnWriteArraySet<ManagedLifecycleThreadEventListener>();
-
-
- /**
- * Construct the threaded managed lifecycle.
- *
- * @param config The configuration associated with this instance.
- * @throws ConfigurationException for configuration errors during initialisation.
- */
- protected AbstractThreadedManagedLifecycle(final ConfigTree config)
- throws ConfigurationException
- {
- super(config);
- }
-
-
- /**
- * Handle the start of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while starting.
- */
- protected void doStart()
- throws ManagedLifecycleException
- {
- runningLock.lock();
- try
- {
- if (!waitUntilStopped())
- {
- throw new ManagedLifecycleException("Thread still active from previous start");
- }
- setRunning(ManagedLifecycleThreadState.RUNNING);
- }
- finally
- {
- runningLock.unlock();
- }
- final Thread thread = new Thread(this);
- thread.start();
- }
-
- /**
- * The thread execution method.
- */
- public final void run()
- {
- waitUntilNotState(ManagedLifecycleState.STARTING, getTerminationPeriod());
- try
- {
- changeState(ManagedLifecycleState.RUNNING);
- doRun();
- }
- catch (final ManagedLifecycleException mle)
- {
- // State change was not allowed, we are already stopping.
- }
- catch (final Throwable th)
- {
- logger.warn("Unexpected error from doRun()", th);
- }
- finally
- {
- setRunning(ManagedLifecycleThreadState.STOPPED);
- }
- }
-
- /**
- * Execute on the thread.
- */
- protected abstract void doRun();
-
- /**
- * Handle the stop of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while stopping.
- */
- protected void doStop()
- throws ManagedLifecycleException
- {
- runningLock.lock();
- try
- {
- if (!isRunning())
- {
- throw new ManagedLifecycleException("Thread is not running");
- }
- setRunning(ManagedLifecycleThreadState.STOPPING);
- }
- finally
- {
- runningLock.unlock();
- }
- }
-
- /**
- * Handle the destroy of the managed instance.
- *
- * @throws ManagedLifecycleException for errors while destroying.
- */
- protected void doDestroy()
- throws ManagedLifecycleException
- {
- if (!waitUntilStopped())
- {
- throw new ManagedLifecycleException("Thread still active");
- }
- }
-
- /**
- * Is the associated thread still running?
- *
- * @return true if the thread is still running, false otherwise.
- */
- public boolean isRunning()
- {
- return checkState(ManagedLifecycleThreadState.RUNNING);
- }
-
- /**
- * Is the associated thread stopped?
- *
- * @return true if the thread is stopped, false otherwise.
- */
- public boolean isStopped()
- {
- return checkState(ManagedLifecycleThreadState.STOPPED);
- }
-
- /**
- * Is the associated thread stopping?
- *
- * @return true if the thread is stopped, false otherwise.
- */
- public boolean isStopping()
- {
- return checkState(ManagedLifecycleThreadState.STOPPING);
- }
-
- /**
- * Check the state against the specified value.
- *
- * @param state The expected state.
- * @return True if the thread is in the expected state, false otherwise.
- */
- private boolean checkState(final ManagedLifecycleThreadState state)
- {
- runningLock.lock();
- try
- {
- return (this.state == state);
- }
- finally
- {
- runningLock.unlock();
- }
- }
-
- /**
- * Set the running state.
- *
- * @param newState The new running state.
- */
- protected void setRunning(final ManagedLifecycleThreadState newState)
- {
- final ManagedLifecycleThreadState origState;
- runningLock.lock();
- try
- {
- origState = state;
- state = newState;
- runningChanged.signalAll();
- }
- finally
- {
- runningLock.unlock();
- }
- fireStateChangedEvent(origState, newState);
- }
-
- /**
- * Wait until the associated thread has stopped.
- *
- * @return true if the thread stops within the expected period, false otherwise.
- */
- public boolean waitUntilStopped()
- {
- return waitUntilStopped(getTerminationPeriod());
- }
-
- /**
- * Wait until the associated thread has stopped.
- *
- * @param terminationPeriod The maximum delay expected for the termination, specified in milliseconds.
- * @return true if the thread stops within the expected period, false otherwise.
- */
- public boolean waitUntilStopped(final long terminationPeriod)
- {
- return waitForRunningStateChange(ManagedLifecycleThreadState.STOPPED, terminationPeriod);
- }
-
- /**
- * Wait until the running state has the specified value.
- *
- * @param state The expected running state value.
- * @param terminationPeriod The maximum delay expected for the termination, specified in milliseconds.
- * @return true if the state has the specified value within the expected period, false otherwise.
- */
- protected boolean waitForRunningStateChange(final ManagedLifecycleThreadState state, final long terminationPeriod)
- {
- try
- {
- runningLock.lock();
- try
- {
- if (this.state != state)
+ /**
+ * Construct the threaded managed lifecycle.
+ * @param config The configuration associated with this instance.
+ * @throws ConfigurationException for configuration errors during initialisation.
+ */
+ protected AbstractThreadedManagedLifecycle(final ConfigTree config)
+ throws ConfigurationException
+ {
+ super(config) ;
+ }
+
+ /**
+ * Handle the start of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while starting.
+ */
+ protected void doStart()
+ throws ManagedLifecycleException
+ {
+ runningLock.lock() ;
+ try
+ {
+ if (!waitUntilStopped())
{
- final long end = System.currentTimeMillis() + terminationPeriod;
- while (this.state != state)
- {
- final long delay = end - System.currentTimeMillis();
- if (delay <= 0)
- {
- break;
- }
- runningChanged.await(delay, TimeUnit.MILLISECONDS);
- }
+ throw new ManagedLifecycleException("Thread still active from previous start") ;
}
- return (this.state == state);
- }
- finally
- {
- runningLock.unlock();
- }
- }
- catch (final InterruptedException ie)
- {
- if (logger.isInfoEnabled())
- {
- logger.info("Interrupted while waiting for running state change");
- }
+ setRunning(ManagedLifecycleThreadState.RUNNING) ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ final Thread thread = new Thread(this) ;
+ thread.start() ;
+ }
+
+ /**
+ * The thread execution method.
+ */
+ public final void run()
+ {
+ waitUntilNotState(ManagedLifecycleState.STARTING, getTerminationPeriod()) ;
+ try
+ {
+ changeState(ManagedLifecycleState.RUNNING) ;
+ doRun() ;
+ }
+ catch (final ManagedLifecycleException mle)
+ {
+ // State change was not allowed, we are already stopping.
+ }
+ catch (final Throwable th)
+ {
+ logger.warn("Unexpected error from doRun()", th) ;
+ }
+ finally
+ {
+ setRunning(ManagedLifecycleThreadState.STOPPED) ;
+ }
+ }
+
+ /**
+ * Execute on the thread.
+ */
+ protected abstract void doRun() ;
+
+ /**
+ * Handle the stop of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while stopping.
+ */
+ protected void doStop()
+ throws ManagedLifecycleException
+ {
+ runningLock.lock() ;
+ try
+ {
+ if (!isRunning())
+ {
+ throw new ManagedLifecycleException("Thread is not running") ;
+ }
+ setRunning(ManagedLifecycleThreadState.STOPPING) ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ }
- runningLock.lock();
- try
- {
- return (this.state == state);
- }
- finally
- {
- runningLock.unlock();
- }
- }
- }
-
- /**
- * Add a managed lifecycle thread event listener.
- *
- * @param listener The listener.
- */
- public void addManagedLifecycleThreadEventListener(final ManagedLifecycleThreadEventListener listener)
- {
- listeners.add(listener);
- }
-
- /**
- * Remove a managed lifecycle thread event listener.
- *
- * @param listener The listener.
- */
- public void removeManagedLifecycleThreadEventListener(final ManagedLifecycleThreadEventListener listener)
- {
- listeners.remove(listener);
- }
-
- /**
- * Fire the state changed event.
- *
- * @param origState The original state, prior to transition
- * @param newState The new state after transition
- */
- private void fireStateChangedEvent(final ManagedLifecycleThreadState origState, final ManagedLifecycleThreadState newState)
- {
- if (listeners.size() > 0)
- {
- final ManagedLifecycleThreadStateEvent event = new ManagedLifecycleThreadStateEvent(this, origState, newState);
- for (ManagedLifecycleThreadEventListener listener : listeners)
- {
- listener.stateChanged(event);
- }
- }
- }
-
- /**
- * Deserialise this managed lifecycle.
- *
- * @param in The input stream.
- * @throws IOException for errors generated by the input stream.
- * @throws ClassNotFoundException For classpath errors.
- */
- private void readObject(final ObjectInputStream in)
- throws IOException, ClassNotFoundException
- {
- in.defaultReadObject();
- state = ManagedLifecycleThreadState.STOPPED;
- }
+ /**
+ * Handle the destroy of the managed instance.
+ *
+ * @throws ManagedLifecycleException for errors while destroying.
+ */
+ protected void doDestroy()
+ throws ManagedLifecycleException
+ {
+ if (!waitUntilStopped())
+ {
+ throw new ManagedLifecycleException("Thread still active") ;
+ }
+ }
+
+ /**
+ * Is the associated thread still running?
+ * @return true if the thread is still running, false otherwise.
+ */
+ public boolean isRunning()
+ {
+ return checkState(ManagedLifecycleThreadState.RUNNING) ;
+ }
+
+ /**
+ * Is the associated thread stopped?
+ * @return true if the thread is stopped, false otherwise.
+ */
+ public boolean isStopped()
+ {
+ return checkState(ManagedLifecycleThreadState.STOPPED) ;
+ }
+
+ /**
+ * Is the associated thread stopping?
+ * @return true if the thread is stopped, false otherwise.
+ */
+ public boolean isStopping()
+ {
+ return checkState(ManagedLifecycleThreadState.STOPPING) ;
+ }
+
+ /**
+ * Check the state against the specified value.
+ * @param state The expected state.
+ * @return True if the thread is in the expected state, false otherwise.
+ */
+ private boolean checkState(final ManagedLifecycleThreadState state)
+ {
+ runningLock.lock() ;
+ try
+ {
+ return (this.state == state) ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ }
+
+ /**
+ * Set the running state.
+ * @param newState The new running state.
+ */
+ protected void setRunning(final ManagedLifecycleThreadState newState)
+ {
+ final ManagedLifecycleThreadState origState ;
+ runningLock.lock() ;
+ try
+ {
+ origState = state ;
+ state = newState ;
+ runningChanged.signalAll() ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ fireStateChangedEvent(origState, newState) ;
+ }
+
+ /**
+ * Wait until the associated thread has stopped.
+ * @return true if the thread stops within the expected period, false otherwise.
+ */
+ public boolean waitUntilStopped()
+ {
+ return waitUntilStopped(getTerminationPeriod()) ;
+ }
+
+ /**
+ * Wait until the associated thread has stopped.
+ * @param terminationPeriod The maximum delay expected for the termination, specified in milliseconds.
+ * @return true if the thread stops within the expected period, false otherwise.
+ */
+ public boolean waitUntilStopped(final long terminationPeriod)
+ {
+ return waitForRunningStateChange(ManagedLifecycleThreadState.STOPPED, terminationPeriod) ;
+ }
+
+ /**
+ * Wait until the running state has the specified value.
+ * @param state The expected running state value.
+ * @param terminationPeriod The maximum delay expected for the termination, specified in milliseconds.
+ * @return true if the state has the specified value within the expected period, false otherwise.
+ */
+ protected boolean waitForRunningStateChange(final ManagedLifecycleThreadState state, final long terminationPeriod)
+ {
+ try
+ {
+ runningLock.lock() ;
+ try
+ {
+ if (this.state != state)
+ {
+ final long end = System.currentTimeMillis() + terminationPeriod ;
+ while(this.state != state)
+ {
+ final long delay = end - System.currentTimeMillis() ;
+ if (delay <= 0)
+ {
+ break ;
+ }
+ runningChanged.await(delay, TimeUnit.MILLISECONDS) ;
+ }
+ }
+ return (this.state == state) ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ }
+ catch (final InterruptedException ie)
+ {
+ if (logger.isInfoEnabled())
+ {
+ logger.info("Interrupted while waiting for running state change") ;
+ }
+
+ runningLock.lock() ;
+ try
+ {
+ return (this.state == state) ;
+ }
+ finally
+ {
+ runningLock.unlock() ;
+ }
+ }
+ }
+
+ /**
+ * Add a managed lifecycle thread event listener.
+ * @param listener The listener.
+ */
+ public void addManagedLifecycleThreadEventListener(final ManagedLifecycleThreadEventListener listener)
+ {
+ listeners.add(listener) ;
+ }
+
+ /**
+ * Remove a managed lifecycle thread event listener.
+ * @param listener The listener.
+ */
+ public void removeManagedLifecycleThreadEventListener(final ManagedLifecycleThreadEventListener listener)
+ {
+ listeners.remove(listener) ;
+ }
+
+ /**
+ * Fire the state changed event.
+ * @param origState The original state, prior to transition
+ * @param newState The new state after transition
+ */
+ private void fireStateChangedEvent(final ManagedLifecycleThreadState origState, final ManagedLifecycleThreadState newState)
+ {
+ if (listeners.size() > 0)
+ {
+ final ManagedLifecycleThreadStateEvent event = new ManagedLifecycleThreadStateEvent(this, origState, newState) ;
+ for(ManagedLifecycleThreadEventListener listener: listeners)
+ {
+ listener.stateChanged(event) ;
+ }
+ }
+ }
+
+ /**
+ * Deserialise this managed lifecycle.
+ * @param in The input stream.
+ * @throws IOException for errors generated by the input stream.
+ * @throws ClassNotFoundException For classpath errors.
+ */
+ private void readObject(final ObjectInputStream in)
+ throws IOException, ClassNotFoundException
+ {
+ in.defaultReadObject() ;
+ state = ManagedLifecycleThreadState.STOPPED ;
+ }
}
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/AbstractMessageComposer.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/message/AbstractMessageComposer.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/AbstractMessageComposer.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/AbstractMessageComposer.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.message.format.MessageFactory;
+
+/**
+ * An abstract {@link MessageComposer} implementation, containing a useful
+ * default implementation.
+ * <p/>
+ * Implementations should be threadsafe (stateless) and must contain
+ * a public default constructor.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class AbstractMessageComposer implements MessageComposer {
+
+ /**
+ * Compose the message.
+ * <p/>
+ * Override to implement alternative {@link Message} construction strategy.
+ *
+ * @param messagePayload Message payload to be packaged, or a channel specific
+ * container class for the message payload (e.g. a JMS message).
+ * @return ESB aware message instance.
+ * @throws MessageDeliverException Failed to compose message payload for delivery.
+ */
+ public Message compose(Object messagePayload) throws MessageDeliverException {
+ Message message = MessageFactory.getInstance().getMessage();
+
+ populateMessage(message, messagePayload);
+
+ return message;
+ }
+
+ /**
+ * Populate
+ *
+ * @param message
+ * @param messagePayload
+ * @throws MessageDeliverException
+ */
+ protected abstract void populateMessage(Message message, Object messagePayload) throws MessageDeliverException;
+}
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageComposer.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageComposer.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageComposer.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageComposer.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,88 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+
+/**
+ * Message composer.
+ * <p/>
+ * A "Composer" is basically a "Builder" ala the GoF patterns.
+ * <p/>
+ * Implementations should be threadsafe (stateless) and must contain
+ * a public default constructor.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public interface MessageComposer {
+
+ /**
+ * Set the conposer's configuration.
+ *
+ * @param config Composer configuration.
+ */
+ public void setConfiguration(ConfigTree config);
+
+ /**
+ * Compose an ESB "aware" message from the supplied message payload.
+ * <p/>
+ * Implementations need to construct and populate an ESB Message from the
+ * messagePayload instance.
+ *
+ * @param messagePayload Message payload to be packaged, or a channel specific
+ * container class for the message payload (e.g. a JMS message).
+ * @return ESB aware message instance.
+ * @throws MessageDeliverException Failed to compose message payload for delivery.
+ */
+ public Message compose(Object messagePayload) throws MessageDeliverException;
+
+ /**
+ * Uitility factory class for reflective {@link MessageComposer} construction.
+ */
+ public static class Factory {
+
+ /**
+ * Factory method.
+ *
+ * @param className Class name.
+ * @param config The composer configuration.
+ * @return Composer instance.
+ * @throws MessageDeliverException Unable to construct composer.
+ */
+ public static MessageComposer getInstance(String className, ConfigTree config) throws MessageDeliverException {
+ AssertArgument.isNotNullAndNotEmpty(className, "className");
+ AssertArgument.isNotNull(config, "config");
+
+ try {
+ MessageComposer instance = (MessageComposer) Class.forName(className).newInstance();
+ instance.setConfiguration(config);
+ return instance;
+ } catch (ClassNotFoundException e) {
+ throw new MessageDeliverException("Composer class [" + className + "] not found in classpath.", e);
+ } catch (InstantiationException e) {
+ throw new MessageDeliverException("Failed to instantiate composer class [" + className + "].", e);
+ } catch (IllegalAccessException e) {
+ throw new MessageDeliverException("Failed to instantiate composer class [" + className + "].", e);
+ }
+ }
+ }
+}
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliverException.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliverException.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliverException.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliverException.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,37 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.soa.esb.BaseException;
+
+/**
+ * Message delivery failure.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageDeliverException extends BaseException {
+
+ public MessageDeliverException(String message) {
+ super(message);
+ }
+
+ public MessageDeliverException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,201 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.apache.log4j.Logger;
+
+import java.util.Collection;
+
+/**
+ * Adapter class for managing {@link Message} delivery to a specified Service.
+ * <p/>
+ * Manages loading of {@link EPR EPRs}, {@link Courier} selection and
+ * message delivery. Provides a unified/simplified interface for message
+ * delivery.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageDeliveryAdapter {
+
+ /**
+ * Class logger.
+ */
+ private static Logger logger = Logger.getLogger(MessageDeliveryAdapter.class);
+
+ /**
+ * The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ */
+ private String serviceCategory;
+ /**
+ * The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ */
+ private String serviceName;
+ /**
+ * The list of installed EPRs for the target Service.
+ */
+ private Collection<EPR> serviceEprs;
+
+ /**
+ * A cached instance of the last EPR that "worked" on this instance. Just saves iterating over
+ * all the EPRs again. If it fails, we iterate as normal.
+ */
+ private EPR lastSuccessfulEPR;
+
+ /**
+ * Public constructor.
+ *
+ * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ * @param serviceName The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ * @throws RegistryException Failed to lookup EPRs for the specified Service.
+ */
+ public MessageDeliveryAdapter(String serviceCategory, String serviceName) throws RegistryException {
+ AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
+ AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
+
+ this.serviceCategory = serviceCategory;
+ this.serviceName = serviceName;
+ serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
+ }
+
+ /**
+ * Deliver the supplied message to this instance instances target service.
+ *
+ * @param message The message to be delivered.
+ * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ */
+ public void deliver(Message message) throws MessageDeliverException {
+
+ // If we have a cached version of an EPR that previously worked, try it first...
+ if (lastSuccessfulEPR != null) {
+ if (attemptDelivery(message, lastSuccessfulEPR)) {
+ // We've successully delivered the message using the same EPR as worked the
+ // the last time. We're done!...
+ return;
+ }
+ }
+
+ // Iterate over all the EPRs in the list...
+ for (EPR epr : serviceEprs) {
+ if (attemptDelivery(message, epr)) {
+ // We've delivered it, we're done!
+ lastSuccessfulEPR = epr;
+ return;
+ }
+ }
+
+ // Throw exception if delivery failed...
+ new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "]. Check for errors.");
+ }
+
+ /**
+ * Get the Service category name for the Service for which this instance is delivering messages.
+ *
+ * @return Service Category.
+ */
+ public String getServiceCategory() {
+ return serviceCategory;
+ }
+
+ /**
+ * Get the Service name for the Service for which this instance is delivering messages.
+ *
+ * @return Service name.
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * Attempt to deliver the supplied message using the supplied EPR.
+ *
+ * @param message The message to be delivered.
+ * @param epr The EPR to be used in the delivery attempt.
+ * @return True if the message was delivered without error, otherwise false.
+ */
+ private boolean attemptDelivery(Message message, EPR epr) {
+ Courier courier = null;
+
+ // Get a courier for the EPR...
+ try {
+ courier = getCourier(epr);
+ } catch (CourierException e) {
+ logger.warn("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
+ } catch (MalformedEPRException e) {
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ }
+
+ // Try delivering the message using the courier we just looked up....
+ if (courier != null) {
+ try {
+ return courier.deliver(message);
+ } catch (CourierException e) {
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ } catch (MalformedEPRException e) {
+ // Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
+ logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ } finally {
+ // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
+ CourierUtil.cleanCourier(courier);
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Get the last EPR that "worked" on this instance.
+ * @return Last successfully used EPR, or null if there was none.
+ */
+ protected EPR getLastSuccessfulEPR() {
+ return lastSuccessfulEPR;
+ }
+
+ /**
+ * Get a {@link org.jboss.soa.esb.couriers.Courier} for the supplied EPR.
+ *
+ * @param epr The EPR for which a {@link org.jboss.soa.esb.couriers.Courier}
+ * is being sought.
+ * @return The courier for the EPR.
+ * @throws CourierException A courier implementation cannot be created.
+ * @throws MalformedEPRException Bad EPR.
+ */
+ protected Courier getCourier(EPR epr) throws CourierException, MalformedEPRException {
+ // This method just allows us to override Courier lookup during unit testing.
+ return CourierFactory.getInstance().getMessageCourier(epr);
+ }
+}
Copied: labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+
+/**
+ * Adapter class for delivering uncomposed (ESB unaware) message payloads to a target service.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ * @see org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter
+ */
+public class UncomposedMessageDeliveryAdapter extends MessageDeliveryAdapter {
+
+ /**
+ * Message composer.
+ */
+ private MessageComposer composer;
+
+ /**
+ * Public constructor.
+ *
+ * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ * @param serviceName The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ * @param composer The the message composer class for this delivery instance.
+ * @throws org.jboss.soa.esb.services.registry.RegistryException
+ * Failed to lookup EPRs for the specified Service.
+ */
+ public UncomposedMessageDeliveryAdapter(String serviceCategory, String serviceName, MessageComposer composer) throws RegistryException {
+ super(serviceCategory, serviceName);
+ AssertArgument.isNotNull(composer, "composer");
+ this.composer = composer;
+ }
+
+
+ /**
+ * Deliver the supplied message to this instance instances target service.
+ *
+ * @param messagePayload Message payload to be packaged, or a channel specific
+ * container class for the message payload (e.g. a JMS message).
+ * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ */
+ public void deliver(Object messagePayload) throws MessageDeliverException {
+ AssertArgument.isNotNull(messagePayload, "messagePayload");
+ Message message = composer.compose(messagePayload);
+
+ super.deliver(message);
+ }
+}
Copied: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerConfigUtil.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerConfigUtil.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerConfigUtil.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/ListenerConfigUtil.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.config.Generator;
+import org.jboss.soa.esb.listeners.config.ESBAwareGenerator;
+import org.jboss.soa.esb.listeners.config.GatewayGenerator;
+import org.jboss.soa.esb.ConfigurationException;
+import org.w3c.dom.Document;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+/**
+ * Test utility class for generating a listener configuration from an xsd based config.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public abstract class ListenerConfigUtil {
+
+ /**
+ * Generate an ESB aware listener config from the supplied XSD based configuration.
+ *
+ * @param config The XSD based config stream.
+ * @param listenerName The name of listener configuration. This is the name supplied in
+ * the "name" attribute on the listener config.
+ * @return The listener ConfigTree config.
+ * @throws ConfigurationException Bad listener ESB configuration.
+ * @throws IOException Unable to read the ESB listener configuration.
+ */
+ public static ConfigTree getESBAwareListenerConfig(InputStream config, String listenerName) throws ConfigurationException, IOException {
+ Generator.XMLBeansModel model = Generator.parseConfig(config);
+
+ ESBAwareGenerator awareGenerator = new ESBAwareGenerator(model);
+ Document awareConfig = awareGenerator.generate();
+
+ return getConfigTree(awareConfig, listenerName);
+ }
+
+ /**
+ * Generate a Gateway listener config from the supplied XSD based configuration.
+ *
+ * @param config The XSD based config stream.
+ * @param listenerName The name of listener configuration. This is the name supplied in
+ * the "name" attribute on the listener config.
+ * @return The listener ConfigTree config.
+ * @throws ConfigurationException Bad listener ESB configuration.
+ * @throws IOException Unable to read the ESB listener configuration.
+ */
+ public static ConfigTree getGatewayListenerConfig(InputStream config, String listenerName) throws ConfigurationException, IOException {
+ Generator.XMLBeansModel model = Generator.parseConfig(config);
+
+ GatewayGenerator gatewayGenerator = new GatewayGenerator(model);
+ Document gatewayConfig = gatewayGenerator.generate();
+
+ return getConfigTree(gatewayConfig, listenerName);
+ }
+
+ private static ConfigTree getConfigTree(Document configDoc, String listenerName) {
+ ConfigTree config = ConfigTree.fromElement(configDoc.getDocumentElement());
+
+ return config.getFirstChild(listenerName);
+ }
+}
Modified: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/RegistryUtilUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/RegistryUtilUnitTest.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/RegistryUtilUnitTest.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -49,15 +49,12 @@
// case difference is deliberate!
RegistryUtil.getEprs("eprmanager", "test");
- }
+ fail("Expected RegistryException");
+ }
catch (RegistryException ex)
{
- exception = true;
}
- if (!exception)
- fail();
-
RegistryUtil.register(tree, epr);
try
Copied: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,223 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.gateway;
+
+import junit.framework.TestCase;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.actions.ActionUtils;
+import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.remoting.Client;
+import org.jboss.remoting.InvokerLocator;
+import org.xml.sax.SAXException;
+
+import java.net.InetAddress;
+
+/**
+ * JBossRemotingGatewayListenerUnit unit tests.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class JBossRemotingGatewayListenerUnitTest extends TestCase {
+
+ private JBossRemotingGatewayListener listener;
+
+ private MockCourier courier1;
+ private MockCourier courier2;
+
+ protected void setUp() throws Exception {
+ MockCourierFactory.install();
+ MockRegistry.install();
+
+ courier1 = new MockCourier(false); // Will fail
+ courier2 = new MockCourier(true); // Will work
+ MockRegistry.register("cat", "servicex", courier1);
+ MockRegistry.register("cat", "servicex", courier2);
+ }
+
+ protected void tearDown() throws Exception {
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ stopServer();
+ }
+
+ public void test_config() throws SAXException {
+ test_config(null, "servicex", "http", "8888", "");
+ test_config("", "servicex", "http", "8888", "");
+ test_config("cat", null, "http", "8888", "");
+ test_config("cat", "", "http", "8888", "");
+ test_config("cat", "servicex", null, "8888", "");
+ test_config("cat", "servicex", "", "8888", "");
+ test_config("cat", "servicex", "http", null, "");
+ test_config("cat", "servicex", "http", "", "");
+ }
+
+ public void test_lifecycle() throws SAXException, ConfigurationException, ManagedLifecycleException {
+ ConfigTree config = getConfig("cat", "servicex", "http", "8888");
+
+ listener = new JBossRemotingGatewayListener(config);
+
+ // try starting without initialising...
+ try {
+ listener.start();
+ fail("Expected ManagedLifecycleException");
+ } catch(ManagedLifecycleException e) {}
+
+ // Initialise...
+ listener.doInitialise();
+
+ // try initialising again...
+ try {
+ listener.doInitialise();
+ fail("Expected ManagedLifecycleException");
+ } catch(ManagedLifecycleException e) {}
+
+ // try stopping without starting...
+ try {
+ listener.stop();
+ fail("Expected ManagedLifecycleException");
+ } catch(ManagedLifecycleException e) {}
+
+ // Start...
+ listener.doStart();
+
+ // try starting again...
+ try {
+ listener.start();
+ fail("Expected ManagedLifecycleException");
+ } catch(ManagedLifecycleException e) {}
+
+ // Stop...
+ listener.doStop();
+
+ // try stopping again...
+ try {
+ listener.doStop();
+ fail("Expected ManagedLifecycleException");
+ } catch(ManagedLifecycleException e) {}
+
+ // Start and stop again...
+ listener.doStart();
+ assertTrue(listener.isStarted());
+ listener.doStop();
+ assertTrue(!listener.isStarted());
+ }
+
+ public void test_http() throws Throwable {
+ test_delivery("http");
+ }
+
+ public void test_socket() throws Throwable {
+ test_delivery("socket");
+ }
+
+ public void test_delivery(String protocol) throws Throwable {
+ startServer(protocol);
+ sendMessageToServer(protocol, protocol + "_payload");
+ stopServer();
+ }
+
+ public void x_test_profile() throws Throwable {
+ startServer("http");
+ for(int i = 0; i < 10000; i++) {
+ sendMessageToServer("http", "xxx_payload");
+ MockCourierFactory.resetCouriers();
+ Thread.sleep(50);
+ }
+ stopServer();
+ }
+
+ private void startServer(String protocol) throws SAXException, ConfigurationException, ManagedLifecycleException {
+ ConfigTree config = getConfig("cat", "servicex", protocol, "8888");
+
+ listener = new JBossRemotingGatewayListener(config);
+
+ listener.doInitialise();
+ listener.doStart();
+ }
+
+ private void sendMessageToServer(String protocol, Object messagePayload) throws Throwable {
+ String locatorURI = protocol + "://" + InetAddress.getLocalHost().getHostName() + ":8888";
+ InvokerLocator locator = new InvokerLocator(locatorURI);
+ System.out.println("Calling remoting server with locator uri of: " + locatorURI);
+
+ Client remotingClient = null;
+ try {
+ remotingClient = new Client(locator);
+ remotingClient.connect();
+
+ // Make sure the courier doesn't have a payload beforehand...
+ assertEquals(null, courier2.message);
+
+ // Deliver the message to the listener...
+ Object response = remotingClient.invoke(messagePayload);
+ assertEquals("<ack/>", response);
+
+ // Make sure the courier payload after matches the input...
+ Object courierPayloadAfter = ActionUtils.getTaskObject(courier2.message);
+ assertEquals(messagePayload, courierPayloadAfter);
+ } finally {
+ if(remotingClient != null) {
+ remotingClient.disconnect();
+ }
+ }
+ }
+
+ private void stopServer() throws ManagedLifecycleException {
+ if(listener != null && listener.isStarted()) {
+ try {
+ listener.doStop();
+ } finally {
+ listener.doDestroy();
+ }
+ }
+ }
+
+ private ConfigTree getConfig(String category, String service, String protocol, String port) throws SAXException {
+ ConfigTree config = ConfigTree.fromXml("<listener/>");
+
+ config.setAttribute(ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, category);
+ config.setAttribute(ListenerTagNames.TARGET_SERVICE_NAME_TAG, service);
+ config.setAttribute(JBossRemotingGatewayListener.JBR_SERVER_PROTOCOL, protocol);
+ config.setAttribute(JBossRemotingGatewayListener.JBR_SERVER_PORT, port);
+
+ return config;
+ }
+
+ private void test_config(String category, String service, String protocol, String port, String exception) throws SAXException {
+ ConfigTree config = getConfig(category, service, protocol, port);
+
+ try {
+ listener = new JBossRemotingGatewayListener(config);
+ listener.doInitialise();
+ fail("Expected a ManagedLifecycleException.");
+ } catch (ConfigurationException e) {
+ fail("Unexpected ConfigurationException.");
+ } catch (ManagedLifecycleException e) {
+ Throwable cause = e.getCause();
+
+ assertTrue(cause instanceof ConfigurationException);
+ assertTrue(cause.getMessage().startsWith(exception));
+ }
+ }
+}
Copied: labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java)
===================================================================
--- labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/core/listeners/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,140 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners.message;
+
+import junit.framework.TestCase;
+import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.couriers.MockCourier;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.actions.ActionUtils;
+
+/**
+ * Tests for the MessageDeliveryAdapter and UncomposedMessageDeliveryAdapter
+ * classes.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageDeliveryAdapterUnitTest extends TestCase {
+
+ private EPR epr1;
+ private EPR epr2;
+ private EPR epr3;
+ private EPR epr4;
+ private EPR epr5;
+ private MockCourier courier1;
+ private MockCourier courier2;
+ private MockCourier courier3;
+ private MockCourier courier4;
+ private MockCourier courier5;
+ private UncomposedMessageDeliveryAdapter mediator;
+ private String payload = "*XX*";
+
+ protected void setUp() throws Exception {
+ MockCourierFactory.install();
+ MockRegistry.install();
+
+ epr1 = new EPR();
+ epr2 = new EPR();
+ epr3 = new EPR();
+ epr4 = new EPR();
+ epr5 = new EPR();
+ courier1 = new MockCourier(false);
+ courier2 = new MockCourier(new CourierException(""));
+ courier3 = new MockCourier(new MalformedEPRException(""));
+ courier4 = new MockCourier(true);
+ courier5 = new MockCourier(true);
+
+ MockRegistry.register("cat", "service", epr1, courier1);
+ MockRegistry.register("cat", "service", epr2, courier2);
+ MockRegistry.register("cat", "service", epr3, courier3);
+ MockRegistry.register("cat", "service", epr4, courier4);
+ MockRegistry.register("cat", "service", epr5, courier5);
+
+ mediator = new UncomposedMessageDeliveryAdapter("cat", "service", new MockMessageComposer());
+ }
+
+ protected void tearDown() throws Exception {
+ MockRegistry.uninstall();
+ MockCourierFactory.uninstall();
+ }
+
+ public void test_getCourier_CourierException() throws MessageDeliverException {
+ // Get the courier factory to throw a CourierException
+ MockCourierFactory.courierException = new CourierException("");
+ mediator.deliver(payload);
+ assertNoDeliveryAttempted();
+ }
+
+ public void test_getCourier_MalformedEPRException() throws MessageDeliverException {
+ // Get the courier factory to throw a MalformedEPRException
+ MockCourierFactory.malformedEPRException = new MalformedEPRException("");
+ mediator.deliver(payload);
+ assertNoDeliveryAttempted();
+ }
+
+ public void test_No_EPRs() throws RegistryException, MessageDeliverException {
+ // Make sure there's no attempt to make a delivery when there's no
+ // EPRs for the service.
+ mediator = new UncomposedMessageDeliveryAdapter("x", "y", new MockMessageComposer());
+ mediator.deliver(payload);
+ assertNoDeliveryAttempted();
+ }
+
+ public void test_deliver() throws MessageDeliverException {
+ // Make sure the delivery happens as expected...
+
+ mediator.deliver(payload);
+ assertEquals(null, courier1.message);
+ assertEquals(null, courier2.message);
+ assertEquals(null, courier3.message);
+ assertEquals(payload, ActionUtils.getTaskObject(courier4.message));
+ assertEquals(null, courier5.message);
+
+ assertEquals(epr4, mediator.getLastSuccessfulEPR());
+ String payload2 = "*YYY*";
+ mediator.deliver(payload2);
+ assertEquals(payload2, ActionUtils.getTaskObject(courier4.message));
+ assertEquals(epr4, mediator.getLastSuccessfulEPR());
+ }
+
+ private void assertNoDeliveryAttempted() {
+ if(courier1.deliveryAttempted ||
+ courier2.deliveryAttempted ||
+ courier3.deliveryAttempted ||
+ courier4.deliveryAttempted ||
+ courier5.deliveryAttempted) {
+ fail("A deliver attempt was made on one of the couriers.");
+ }
+ }
+
+ public class MockMessageComposer extends AbstractMessageComposer {
+ public void setConfiguration(ConfigTree config) {
+ }
+ protected void populateMessage(Message message, Object messagePayload) throws MessageDeliverException {
+ ActionUtils.setTaskObject(message, messagePayload);
+ }
+ }
+}
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/couriers/CourierFactory.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -37,25 +37,63 @@
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.lifecycle.LifecycleIdentity;
-public class CourierFactory
+public class CourierFactory
{
- /**
- * The logger for this instance.
- */
- private static final Logger LOGGER = Logger.getLogger(CourierFactory.class) ;
+ /**
+ * The logger for this instance.
+ */
+ private static final Logger LOGGER = Logger.getLogger(CourierFactory.class) ;
- /**
- * Track lifecycle couriers.
- */
- private static final Map<String, Map<TwoWayCourier, Exception>> lifecycleCouriers = new HashMap<String, Map<TwoWayCourier, Exception>>() ;
-
- static
- {
- lifecycleCouriers.put(null, new HashMap<TwoWayCourier, Exception>()) ;
- }
+ /**
+ * Track lifecycle couriers.
+ */
+ private static final Map<String, Map<TwoWayCourier, Exception>> lifecycleCouriers = new HashMap<String, Map<TwoWayCourier, Exception>>() ;
+
+ /**
+ * Factory singleton instance.
+ */
+ private static CourierFactory instance;
+
+ static
+ {
+ lifecycleCouriers.put(null, new HashMap<TwoWayCourier, Exception>()) ;
+ }
- /**
+ // protected default constructor
+ protected CourierFactory() {}
+
+ /**
+ * Get the CourierFactory method.
+ * @return The Courier factory instance.
+ */
+ public static CourierFactory getInstance() {
+ return instance;
+ }
+
+ /**
+ * Set the CourierFactory method.
+ * <p/>
+ * Supports unit testing.
+ *
+ * @param instance New factory instance.
+ */
+ protected static void setInstance(CourierFactory instance) {
+ CourierFactory.instance = instance;
+ }
+
+ /**
* Obtain a courier which can perform deliveries only and prime it with the "to address" (toEPR).
+ *
+ * @param toEPR - the 'to address', the address on where to deliver a Message.
+ * @return
+ * @throws CourierException if the specific courier implementation cannot be created.
+ */
+ public Courier getMessageCourier(EPR toEPR) throws CourierException, MalformedEPRException {
+ return getCourier(toEPR);
+ }
+
+ /**
+ * Obtain a courier which can perform deliveries only and prime it with the "to address" (toEPR).
*
* @param toEPR - the 'to address', the address on where to deliver a Message.
* @return
@@ -69,7 +107,6 @@
* Obtain a courier which can perform pickups only and prime it with the "to address" (toEPR)
* and the "replyTo address" (replyToEPR).
*
- * @param toEPR - the 'to address', the address on where to deliver a Message.
* @parem replyToEPR - the 'replyTo address', the address where to deliver the Message we pickup.
* @return
* @throws CourierException if the specific courier implementation cannot be created.
Modified: labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/rosetta/src/org/jboss/soa/esb/helpers/ConfigTree.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -483,7 +483,7 @@
} // __________________________________
- private static ConfigTree fromElement(Element elem)
+ public static ConfigTree fromElement(Element elem)
{
ConfigTree tree = new ConfigTree(elem.getNodeName());
NamedNodeMap NM = elem.getAttributes();
Copied: labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java)
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers;
+
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+
+/**
+ * Mock Courier impl.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockCourier implements Courier {
+
+ public boolean deliveryResult = true;
+ public Message message;
+ public boolean deliveryAttempted = false;
+ public CourierException courierException;
+ public MalformedEPRException malformedEPRException;
+
+ public MockCourier(boolean deliveryResult) {
+ this.deliveryResult = deliveryResult;
+ }
+
+ public MockCourier(CourierException courierException) {
+ this.courierException = courierException;
+ }
+
+ public MockCourier(MalformedEPRException malformedEPRException) {
+ this.malformedEPRException = malformedEPRException;
+ }
+
+ public boolean deliver(Message message) throws CourierException, MalformedEPRException {
+ deliveryAttempted = true;
+
+ if(courierException != null) {
+ throw courierException;
+ } else if(malformedEPRException != null) {
+ throw malformedEPRException;
+ }
+
+ if(deliveryResult) {
+ // Only set the message ref if "successful"...
+ this.message = message;
+ }
+
+ return deliveryResult;
+ }
+
+ public void cleanup() {
+ }
+
+ public void reset() {
+ message = null;
+ deliveryAttempted = false;
+ }
+}
Copied: labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourierFactory.java (from rev 10610, labs/jbossesb/workspace/jboss-remoting/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourierFactory.java)
===================================================================
--- labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourierFactory.java (rev 0)
+++ labs/jbossesb/trunk/product/core/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourierFactory.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -0,0 +1,74 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.internal.soa.esb.couriers;
+
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Mock CourierFactory.
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MockCourierFactory extends CourierFactory {
+
+ public static CourierException courierException;
+ public static MalformedEPRException malformedEPRException;
+ public static Hashtable<EPR, Courier> couriers = new Hashtable<EPR, Courier>();
+ private static CourierFactory originalInstance;
+
+ public Courier getMessageCourier(EPR toEPR) throws CourierException, MalformedEPRException {
+ if(courierException != null) {
+ throw courierException;
+ } else if(malformedEPRException != null) {
+ throw malformedEPRException;
+ }
+
+ return couriers.get(toEPR);
+ }
+
+ public static void install() {
+ originalInstance = CourierFactory.getInstance();
+ CourierFactory.setInstance(new MockCourierFactory());
+ }
+
+ public static void uninstall() {
+ CourierFactory.setInstance(originalInstance);
+ reset();
+ }
+
+ public static void reset() {
+ couriers.clear();
+ courierException = null;
+ malformedEPRException = null;
+ }
+
+ public static void resetCouriers() {
+ for (Map.Entry<EPR, Courier> entry : couriers.entrySet()) {
+ ((MockCourier)entry.getValue()).reset();
+ }
+ }
+}
Modified: labs/jbossesb/trunk/product/core/services/tests/src/org/jboss/internal/soa/esb/services/registry/MockRegistry.java
===================================================================
--- labs/jbossesb/trunk/product/core/services/tests/src/org/jboss/internal/soa/esb/services/registry/MockRegistry.java 2007-03-29 12:45:49 UTC (rev 10612)
+++ labs/jbossesb/trunk/product/core/services/tests/src/org/jboss/internal/soa/esb/services/registry/MockRegistry.java 2007-03-29 14:48:47 UTC (rev 10613)
@@ -32,6 +32,8 @@
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
+import org.jboss.internal.soa.esb.couriers.MockCourier;
import com.arjuna.common.util.propertyservice.PropertyManager;
@@ -42,169 +44,195 @@
* <p/>
* Just call {@link #install()} and {@link #uninstall()} from inside your test setUp and tearDown
* methods respectfully.
- *
+ *
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
*/
public class MockRegistry implements Registry {
-
- private static final String REGISTRY_IMPEMENTATION_CLASS_ORIGINAL = Environment.REGISTRY_IMPEMENTATION_CLASS + "#Original";
- public static PropertyManager regPropManager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.REGISTRY_MODULE);
- public List<RepositoryEntry> repository = new ArrayList<RepositoryEntry>();
-
- /**
- * Install this Mock Registry impl as the registry implementation to be used.
- * <p/>
- * Call this method in the test setUp.
- */
- public static void install() {
- if(regPropManager == null) {
- TestCase.fail("Failed to locate PropertyManager for [" + ModulePropertyManager.REGISTRY_MODULE + "].");
- }
- String currentRegImpl = regPropManager.getProperty(Environment.REGISTRY_IMPEMENTATION_CLASS);
-
- if(currentRegImpl != null) {
- // Save the current/original.
- regPropManager.setProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL, currentRegImpl);
- }
- regPropManager.setProperty(Environment.REGISTRY_IMPEMENTATION_CLASS, MockRegistry.class.getName());
- }
- /**
- * Uninstall this Mock Registry impl as the registry implementation to be used. Reinstate the reg impl that was
- * specified prior to the install.
- * <p/>
- * Call this method in the test tearDown.
- */
- public static void uninstall() {
- if(regPropManager == null) {
- TestCase.fail("Failed to locate PropertyManager for [" + ModulePropertyManager.REGISTRY_MODULE + "].");
- }
- String originalRegImpl = regPropManager.getProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL);
+ private static final String REGISTRY_IMPEMENTATION_CLASS_ORIGINAL = Environment.REGISTRY_IMPEMENTATION_CLASS + "#Original";
+ public static PropertyManager regPropManager = ModulePropertyManager.getPropertyManager(ModulePropertyManager.REGISTRY_MODULE);
+ public static List<RepositoryEntry> repository = new ArrayList<RepositoryEntry>();
- if(originalRegImpl != null) {
- // Reset the original.
- regPropManager.setProperty(Environment.REGISTRY_IMPEMENTATION_CLASS, originalRegImpl);
- regPropManager.removeProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL);
- } else {
- // It wasn't set in the first place, so just unset the impl
- regPropManager.removeProperty(Environment.REGISTRY_IMPEMENTATION_CLASS);
- }
- }
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#registerEPR(java.lang.String, java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR, java.lang.String)
- */
- public void registerEPR(String serviceCategoryName, String serviceName,
- String serviceDescription, EPR epr, String eprDescription)
- throws RegistryException {
-
- repository.add(new RepositoryEntry(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription));
- }
-
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#unRegisterService(java.lang.String, java.lang.String)
- */
- public void unRegisterService(String category, String serviceName) throws RegistryException {
- unRegisterEPR(category, serviceName, null);
- }
+ /**
+ * Install this Mock Registry impl as the registry implementation to be used.
+ * <p/>
+ * Call this method in the test setUp.
+ */
+ public static void install() {
+ if (regPropManager == null) {
+ TestCase.fail("Failed to locate PropertyManager for [" + ModulePropertyManager.REGISTRY_MODULE + "].");
+ }
+ String currentRegImpl = regPropManager.getProperty(Environment.REGISTRY_IMPEMENTATION_CLASS);
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#unRegisterEPR(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
- */
- public void unRegisterEPR(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException {
- int indexOf = repository.indexOf(new RepositoryEntry(serviceCategoryName, serviceName, null, epr, null));
-
- if(indexOf == -1) {
- throw new RegistryException("Registry entry [" + serviceCategoryName + "][" + serviceName + "] not found.");
- }
- repository.remove(indexOf);
- }
+ if (currentRegImpl != null) {
+ // Save the current/original.
+ regPropManager.setProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL, currentRegImpl);
+ }
+ regPropManager.setProperty(Environment.REGISTRY_IMPEMENTATION_CLASS, MockRegistry.class.getName());
+ }
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#findAllServices()
- */
- public Collection<String> findAllServices() throws RegistryException {
- Collection<String> services = new ArrayList<String>();
- for(RepositoryEntry entry : repository) {
- services.add(entry.serviceName);
- }
- return services;
- }
+ /**
+ * Uninstall this Mock Registry impl as the registry implementation to be used. Reinstate the reg impl that was
+ * specified prior to the install.
+ * <p/>
+ * Call this method in the test tearDown.
+ */
+ public static void uninstall() {
+ if (regPropManager == null) {
+ TestCase.fail("Failed to locate PropertyManager for [" + ModulePropertyManager.REGISTRY_MODULE + "].");
+ }
+ String originalRegImpl = regPropManager.getProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL);
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#findServices(java.lang.String)
- */
- public Collection<String> findServices(String serviceCategoryName)
- throws RegistryException {
- Collection<String> services = new ArrayList<String>();
- for(RepositoryEntry entry : repository) {
- if(serviceCategoryName.equals(entry.serviceCategoryName)) {
- services.add(entry.serviceName);
- }
- }
- return services;
- }
+ if (originalRegImpl != null) {
+ // Reset the original.
+ regPropManager.setProperty(Environment.REGISTRY_IMPEMENTATION_CLASS, originalRegImpl);
+ regPropManager.removeProperty(REGISTRY_IMPEMENTATION_CLASS_ORIGINAL);
+ } else {
+ // It wasn't set in the first place, so just unset the impl
+ regPropManager.removeProperty(Environment.REGISTRY_IMPEMENTATION_CLASS);
+ }
+ }
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#findEPRs(java.lang.String, java.lang.String)
- */
- public Collection<EPR> findEPRs(String serviceCategoryName,
- String serviceName) throws RegistryException {
- Collection<EPR> services = new ArrayList<EPR>();
- for(RepositoryEntry entry : repository) {
- if(serviceCategoryName.equals(entry.serviceCategoryName) && serviceName.equals(entry.serviceName)) {
- services.add(entry.epr);
- }
- }
- return services;
- }
+ /**
+ * Utility method for registering a courier for a service.
+ * <p/>
+ * The utility creates an empty EPR against
+ *
+ * @param category Service category.
+ * @param service Service name.
+ * @param courier EPR Courier.
+ */
+ public static void register(String category, String service, MockCourier courier) {
+ EPR epr = new EPR();
+ register(category, service, epr, courier);
+ }
- /* (non-Javadoc)
- * @see org.jboss.soa.esb.services.registry.Registry#findEPR(java.lang.String, java.lang.String)
- */
- public EPR findEPR(String serviceCategoryName, String serviceName)
- throws RegistryException {
- for(RepositoryEntry entry : repository) {
- if(serviceCategoryName.equals(entry.serviceCategoryName) && serviceName.equals(entry.serviceName)) {
- return entry.epr;
- }
- }
- return null;
- }
-
- public class RepositoryEntry {
- public String serviceCategoryName;
- public String serviceName;
- public String serviceDescription;
- public EPR epr;
- public String eprDescription;
+ /**
+ * Utility method for registering a service EPR, as well as its courier.
+ * @param category Service category.
+ * @param service Service name.
+ * @param epr Service EPR.
+ * @param courier EPR Courier.
+ */
+ public static void register(String category, String service, EPR epr, MockCourier courier) {
+ repository.add(new RepositoryEntry(category, service, "", epr, ""));
+ MockCourierFactory.couriers.put(epr, courier);
+ }
- public RepositoryEntry(String serviceCategoryName, String serviceName, String serviceDescription, EPR epr, String eprDescription) {
- this.serviceCategoryName = serviceCategoryName;
- this.serviceName = serviceName;
- this.serviceDescription = serviceDescription;
- this.epr = epr;
- this.eprDescription = eprDescription;
- }
-
- public boolean equals(Object obj) {
- if(obj instanceof RepositoryEntry) {
- RepositoryEntry entry = (RepositoryEntry)obj;
-
- if(serviceCategoryName != null && !serviceCategoryName.equalsIgnoreCase(entry.serviceCategoryName)) {
- return false;
- }
- if(serviceName != null && !serviceName.equalsIgnoreCase(entry.serviceName)) {
- return false;
- }
- if(epr != null && epr != entry.epr) {
- return false;
- }
-
- return true;
- }
-
- return false;
- }
- }
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#registerEPR(java.lang.String, java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR, java.lang.String)
+ */
+ public void registerEPR(String serviceCategoryName, String serviceName,
+ String serviceDescription, EPR epr, String eprDescription)
+ throws RegistryException {
+
+ repository.add(new RepositoryEntry(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription));
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#unRegisterService(java.lang.String, java.lang.String)
+ */
+ public void unRegisterService(String category, String serviceName) throws RegistryException {
+ unRegisterEPR(category, serviceName, null);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#unRegisterEPR(java.lang.String, java.lang.String, org.jboss.soa.esb.addressing.EPR)
+ */
+ public void unRegisterEPR(String serviceCategoryName, String serviceName, EPR epr) throws RegistryException {
+ int indexOf = repository.indexOf(new RepositoryEntry(serviceCategoryName, serviceName, null, epr, null));
+
+ if (indexOf == -1) {
+ throw new RegistryException("Registry entry [" + serviceCategoryName + "][" + serviceName + "] not found.");
+ }
+ repository.remove(indexOf);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#findAllServices()
+ */
+ public Collection<String> findAllServices() throws RegistryException {
+ Collection<String> services = new ArrayList<String>();
+ for (RepositoryEntry entry : repository) {
+ services.add(entry.serviceName);
+ }
+ return services;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#findServices(java.lang.String)
+ */
+ public Collection<String> findServices(String serviceCategoryName)
+ throws RegistryException {
+ Collection<String> services = new ArrayList<String>();
+ for (RepositoryEntry entry : repository) {
+ if (serviceCategoryName.equals(entry.serviceCategoryName)) {
+ services.add(entry.serviceName);
+ }
+ }
+ return services;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#findEPRs(java.lang.String, java.lang.String)
+ */
+ public Collection<EPR> findEPRs(String serviceCategoryName,
+ String serviceName) throws RegistryException {
+ Collection<EPR> services = new ArrayList<EPR>();
+ for (RepositoryEntry entry : repository) {
+ if (serviceCategoryName.equals(entry.serviceCategoryName) && serviceName.equals(entry.serviceName)) {
+ services.add(entry.epr);
+ }
+ }
+ return services;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.soa.esb.services.registry.Registry#findEPR(java.lang.String, java.lang.String)
+ */
+ public EPR findEPR(String serviceCategoryName, String serviceName)
+ throws RegistryException {
+ for (RepositoryEntry entry : repository) {
+ if (serviceCategoryName.equals(entry.serviceCategoryName) && serviceName.equals(entry.serviceName)) {
+ return entry.epr;
+ }
+ }
+ return null;
+ }
+
+ public static class RepositoryEntry {
+ public String serviceCategoryName;
+ public String serviceName;
+ public String serviceDescription;
+ public EPR epr;
+ public String eprDescription;
+
+ public RepositoryEntry(String serviceCategoryName, String serviceName, String serviceDescription, EPR epr, String eprDescription) {
+ this.serviceCategoryName = serviceCategoryName;
+ this.serviceName = serviceName;
+ this.serviceDescription = serviceDescription;
+ this.epr = epr;
+ this.eprDescription = eprDescription;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof RepositoryEntry) {
+ RepositoryEntry entry = (RepositoryEntry) obj;
+
+ if (serviceCategoryName != null && !serviceCategoryName.equalsIgnoreCase(entry.serviceCategoryName)) {
+ return false;
+ }
+ if (serviceName != null && !serviceName.equalsIgnoreCase(entry.serviceName)) {
+ return false;
+ }
+ if (epr != null && epr != entry.epr) {
+ return false;
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+ }
}
Modified: labs/jbossesb/trunk/product/tools/configeditor/editor/dist/jbossesb-config-editor.war
===================================================================
(Binary files differ)
More information about the jboss-svn-commits
mailing list