[jboss-svn-commits] JBL Code SVN: r13981 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb and 5 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Aug 3 06:18:29 EDT 2007
Author: tfennelly
Date: 2007-08-03 06:18:29 -0400 (Fri, 03 Aug 2007)
New Revision: 13981
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
Modified:
labs/jbossesb/trunk/product/install/build.xml
labs/jbossesb/trunk/product/install/readme.txt
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
http://jira.jboss.com/jira/browse/JBESB-685
Deprecation of MessageRouter. Replaced uses of this class with the MessageMulticaster class which basicaly allows you to multicast a message to multiple endpoints, caching the ServiceInvokers in the process.
Modified: labs/jbossesb/trunk/product/install/build.xml
===================================================================
--- labs/jbossesb/trunk/product/install/build.xml 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/install/build.xml 2007-08-03 10:18:29 UTC (rev 13981)
@@ -212,7 +212,11 @@
</copy>
</target>
- <target name="deployIntros" depends="check.deploy.props, dependencies" description="Deploys JAXB Intros to the application server">
+ <target name="deployIntros">
+ <echo message="***** DEPRECATED: Sorry, this target has been deprecated. Please run the 'patch-jbossws' target." />
+ </target>
+
+ <target name="patch-jbossws" depends="check.deploy.props, dependencies" description="Deploys JAXB Intros to the application server">
<property name="jbossws.location" location="${deploy.dir}/jbossws.sar"/>
<property name="jbossws.beans.location" location="${jbossws.location}/jbossws.beans/META-INF/jboss-beans.xml"/>
<property name="jbossws.beans.tmp.location" location="${jbossws.location}/jbossws.beans/META-INF/jboss-beans.new.xml"/>
@@ -222,15 +226,17 @@
<available property="jbossws.exists" file="${jbossws.location}" type="dir"/>
<fail unless="jbossws.exists" message="Please install JBossWS into the Application Server"/>
- <available property="jaxb.intros.exist" file="${jbossws.location}/${jaxb.intros.jar}" type="file"/>
- <fail if="jaxb.intros.exist" message="JAXB Intros appear to be install already"/>
-
- <xslt style="jaxb.xslt" in="${jbossws.beans.location}" out="${jbossws.beans.tmp.location}"/>
- <move file="${jbossws.beans.tmp.location}" tofile="${jbossws.beans.location}"/>
-
<copy todir="${jbossws.location}">
<fileset dir="${org.jboss.esb.dist}/extras/jaxbintros" includes="${jaxb.intros.jar}"/>
<fileset dir="${org.jboss.esb.dist.lib}/soap.esb" includes="${jbossesb.soap.jar}"/>
</copy>
+
+ <available property="jaxb.intros.exist" file="${jbossws.location}/${jaxb.intros.jar}" type="file"/>
+ <antcall target="apply-jbossws-xslt" />
</target>
+
+ <target name="apply-jbossws-xslt" unless="jaxb.intros.exist">
+ <xslt style="jaxb.xslt" in="${jbossws.beans.location}" out="${jbossws.beans.tmp.location}"/>
+ <move file="${jbossws.beans.tmp.location}" tofile="${jbossws.beans.location}"/>
+ </target>
</project>
Modified: labs/jbossesb/trunk/product/install/readme.txt
===================================================================
--- labs/jbossesb/trunk/product/install/readme.txt 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/install/readme.txt 2007-08-03 10:18:29 UTC (rev 13981)
@@ -23,4 +23,4 @@
directory of the jbossesb installation.
- Follow the 'Deployment into JBoss AS 4.2.0.GA' section above (if not
already installed)
- - execute 'ant deployIntros'
+ - execute 'ant patch-jbossws'
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb;
+
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+
+/**
+ * Service.
+ * <p/>
+ * Simple immutable compound value object associating a Service "Category" with a
+ * Service "Name".
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class Service {
+
+ private String category;
+ private String name;
+
+ public Service(String category, String name) {
+ AssertArgument.isNotNullAndNotEmpty(category, "category");
+ AssertArgument.isNotNullAndNotEmpty(name, "name");
+ this.category = category.trim();
+ this.name = name.trim();
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean equals(Object obj) {
+ if(obj instanceof Service) {
+ Service service = (Service) obj;
+ if(service.category.equals(category) && service.name.equals(name)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ public int hashCode() {
+ return (category + name).hashCode();
+ }
+
+ public String toString() {
+ return category + ":" + name;
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -68,7 +68,9 @@
*/
public class Aggregator extends AbstractActionPipelineProcessor
{
+ public final static String AGGEGRATOR_TAG = "aggregatorTag";
public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
+
private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
= new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
private TimeoutChecker _timeoutChecker=null;
@@ -203,6 +205,11 @@
serviceName = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
serviceCategoryName = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
}
+
+ public static void decorate(Message message) {
+
+ }
+
/**
* Aggregates the messages into 1 new message with an attachment for each message.
*
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -22,21 +22,23 @@
package org.jboss.soa.esb.actions;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageMulticaster;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.mapping.ObjectMapper;
import org.jboss.soa.esb.services.registry.Registry;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.services.registry.RegistryFactory;
-import org.jboss.soa.esb.services.routing.MessageRouter;
import org.jboss.soa.esb.services.routing.MessageRouterException;
import org.jboss.soa.esb.services.routing.cbr.ContentBasedRouterFactory;
@@ -47,6 +49,7 @@
* @author kevin.conner at jboss.com
*/
public class ContentBasedWiretap extends AbstractActionPipelineProcessor {
+
public static final String ROUTE_TO_TAG = "route-to";
public static final String OBJECT_PATH_TAG = "object-path";
@@ -66,9 +69,8 @@
}
public void initialise() {
- if (_destinations.size() < 1) {
- _logger
- .warn("Missing or empty destination list - This action class won't have any effect");
+ if (messageMulticaster.getRecipientCount() == 0) {
+ _logger.warn("Missing or empty destination list - This action class won't have any effect");
}
}
@@ -105,8 +107,13 @@
+ _destinations.keySet()
+ ". Please fix your configuration.");
- MessageRouter.deliverMessage(MessageRouter.INTERNAL_SERVICE_CATEGORY,
- MessageRouter.DEAD_LETTER_SERVICE_NAME, message);
+ try {
+ ServiceInvoker.deliverToDeadLetterChannel(message);
+ } catch (MessageDeliverException e) {
+ throw new MessageRouterException("Failed to deliver message to Dead Letter Channel.", e);
+ } catch (RegistryException e) {
+ throw new MessageRouterException("Failed to lookup Dead Letter Channel.", e);
+ }
}
protected final void routeMessage(Message message)
@@ -115,19 +122,26 @@
_messagePathList);
List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage,
_ruleReload, message, objectList);
- Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
+ List<Service> outgoingDestinations = new ArrayList<Service>();
for (String destination : destinations) {
if (_destinations.containsKey(destination)) {
outgoingDestinations.add(_destinations.get(destination));
}
}
if (outgoingDestinations.size() > 0) {
- MessageRouter.deliverMessages(outgoingDestinations, message);
+ try {
+ messageMulticaster.sendToSubset(message, outgoingDestinations);
+ } catch (RegistryException e) {
+ throw new MessageRouterException(e);
+ } catch (MessageDeliverException e) {
+ throw new MessageRouterException(e);
+ }
} else {
noDestinations();
- if (destinations.size() > 0)
+ if (destinations.size() > 0) {
invalidRuleConfiguration(message, destinations);
+ }
}
}
@@ -163,7 +177,7 @@
_cbrClass = DEFAULT_CBR_CLASS;
}
- _destinations = new HashMap<String, String[]>();
+ _destinations = new HashMap<String, Service>();
ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
if (destList != null) {
for (ConfigTree curr : destList) {
@@ -174,8 +188,9 @@
ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
String name = curr
.getRequiredAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _destinations.put(key, new String[]
- {category, name});
+ Service service = new Service(category, name);
+ _destinations.put(key, service);
+ messageMulticaster.addRecipient(service);
}
catch (Exception e) {
throw new ConfigurationException(
@@ -202,8 +217,10 @@
protected ConfigTree _config;
- protected Map<String, String[]> _destinations;
+ protected Map<String, Service> _destinations;
+ protected MessageMulticaster messageMulticaster = new MessageMulticaster();
+
protected String _cbrClass;
protected String _ruleSet;
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -28,17 +28,15 @@
*/
package org.jboss.soa.esb.actions;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.MessageMulticaster;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.routing.MessageRouter;
-import org.jboss.soa.esb.services.routing.MessageRouterException;
public class StaticWiretap extends AbstractActionPipelineProcessor
{
@@ -57,13 +55,13 @@
{
try
{
- MessageRouter.deliverMessages(_destinations, message);
+ messageMulticaster.sendToAll(message);
return message;
+ } catch (RegistryException e) {
+ throw new ActionProcessingException(e);
+ } catch (MessageDeliverException e) {
+ throw new ActionProcessingException(e);
}
- catch (MessageRouterException ex)
- {
- throw new ActionProcessingException(ex);
- }
}
/**
@@ -71,7 +69,6 @@
*/
public void initialise() throws ActionLifecycleException
{
- _destinations = new ArrayList<String[]>();
ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
if (null == destList || destList.length < 1)
{
@@ -84,7 +81,8 @@
{
String category = curr.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
String name = curr.getRequiredAttribute(ListenerTagNames.SERVICE_NAME_TAG);
- _destinations.add(new String[] { category, name });
+ Service service = new Service(category, name);
+ messageMulticaster.addRecipient(service);
}
catch (Exception e)
{
@@ -95,7 +93,7 @@
protected ConfigTree _config;
- protected List<String[]> _destinations;
+ protected MessageMulticaster messageMulticaster = new MessageMulticaster();
protected static Logger _logger = Logger.getLogger(StaticRouter.class);
}
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.actions.Aggregator;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+/**
+ * Message Multicaster.
+ * <p/>
+ * Used to send a message to a recipient list, or a subset of that recipient list.
+ * <p/>
+ * Caches a {@link ServiceInvoker} instance for each recipient.
+ *
+ * @author <a href="mailto:tom.fennely at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageMulticaster {
+
+ private static Logger logger = Logger.getLogger(MessageMulticaster.class);
+
+ private Map<Service, ServiceInvoker> invokers = new HashMap<Service, ServiceInvoker>();
+
+ /**
+ * Add a message recipient Service.
+ * @param service Recipient service for receipt of messages from this miltcaster instance.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
+ */
+ public void addRecipient(Service service) throws RegistryException, MessageDeliverException {
+ AssertArgument.isNotNull(service, "service");
+ // The ServiceInvoker will be lazilly created in the getInvoker method.
+ // This is effectively a registration of the service as being a potential recipient.
+ invokers.put(service, null);
+ }
+
+ /**
+ * Is the specified service a recipient of this multicaster instance.
+ * @param service The service to check for.
+ * @return True if the supplied service is one of the recipients, otherwise false.
+ */
+ public boolean isRecipient(Service service) {
+ return invokers.containsKey(service);
+ }
+
+ /**
+ * Get the number ot recipients associated with this multicaster instance.
+ * @return The number of recipients.
+ */
+ public int getRecipientCount() {
+ return invokers.size();
+ }
+
+ /**
+ * Send the message to all the recipients associated with this multicaster.
+ * @param message The message.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
+ */
+ public void sendToAll(Message message) throws RegistryException, MessageDeliverException {
+ sendToSubset(message, new ArrayList<Service>(invokers.keySet()));
+ }
+
+ /**
+ * Send the message to all the recipients associated with this multicaster and
+ * also listed in the supplied recipient list.
+ * <p/>
+ * The recipients supplied in the list must have been added through the {@link #addRecipient(org.jboss.soa.esb.Service)}
+ * method, otherwise the message will be delivered to the Dead Letter Channel.
+ *
+ * @param message The message.
+ * @param recipients The recipient subset to which the supplied message is to be sent.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
+ */
+ public void sendToSubset(Message message, List<Service> recipients) throws RegistryException, MessageDeliverException {
+ String uuId = UUID.randomUUID().toString();
+ ArrayList<String> aggregatorTags = new ArrayList<String>();
+ int recipientCount = recipients.size();
+ long timestamp = System.currentTimeMillis();
+
+ for(int i = 0; i < recipientCount; i++) {
+ Service recipient = recipients.get(i);
+ ServiceInvoker invoker = getInvoker(recipient);
+ String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
+
+ aggregatorTags.add(tag);
+ message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
+ if (logger.isDebugEnabled()) {
+ logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + tag);
+ }
+
+ if(invoker == null) {
+ logger.error("Service '" + recipient + "' is not in recipient list. Delivering message to Dead Letter Channel.");
+ ServiceInvoker.deliverToDeadLetterChannel(message);
+ } else {
+ try {
+ invoker.deliverAsync(message);
+ } catch(MessageDeliverException e) {
+ logger.error("Failed to deliver message to Service '" + recipient + "'. Delivering message to Dead Letter Channel.");
+ ServiceInvoker.deliverToDeadLetterChannel(message);
+ }
+ }
+ }
+ }
+
+ private ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
+ ServiceInvoker invoker = invokers.get(recipient);
+
+ // We lazilly create the invokers...
+ if(invoker == null) {
+ if(!invokers.containsKey(recipient)) {
+ return null;
+ }
+ invoker = new ServiceInvoker(recipient);
+ invokers.put(recipient, invoker);
+ }
+
+ return invoker;
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -28,7 +28,7 @@
import org.jboss.internal.soa.esb.assertion.AssertArgument;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.util.DefaultReplyTo;
@@ -39,7 +39,6 @@
import org.jboss.soa.esb.couriers.CourierUtil;
import org.jboss.soa.esb.couriers.FaultMessageException;
import org.jboss.soa.esb.couriers.TwoWayCourier;
-import org.jboss.soa.esb.listeners.RegistryUtil;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
@@ -59,27 +58,24 @@
*/
public class ServiceInvoker {
+ public static final String INTERNAL_SERVICE_CATEGORY = "JBossESB-Internal";
+
+ public static final String DEAD_LETTER_SERVICE_NAME = "DeadLetterService";
+
/**
* Class logger.
*/
private static Logger logger = Logger.getLogger(ServiceInvoker.class);
-
/**
- * The <b>category name</b> of the Service to which this instance will
- * deliver messages.
+ * The target service.
*/
- private String serviceCategory;
+ private Service service;
/**
- * The <b>name</b> of the Service to which this instance will
- * deliver messages.
+ * Load balancer.
*/
- private String serviceName;
- /**
- *
- */
private LoadBalancePolicy loadBalancer;
/**
- *
+ * Cluster info.
*/
private ServiceClusterInfo serviceClusterInfo;
/**
@@ -87,23 +83,24 @@
*/
private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
/**
- *
+ *
*/
private Date expirationDate;
/**
+ * Dead letter channel Service invoker.
+ */
+ private static ServiceInvoker dlQueueInvoker;
+
+ /**
* Public constructor.
*
- * @param serviceCategory The <b>category name</b> of the Service to which this instance will
- * deliver messages.
- * @param serviceName The <b>name</b> of the Service to which this instance will
- * deliver messages.
- * @throws RegistryException Failed to lookup EPRs for the specified Service.
+ * @param service The Service to which this instance will deliver messages.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
*/
- public ServiceInvoker(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
- AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
- AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
- this.serviceCategory = serviceCategory;
- this.serviceName = serviceName;
+ public ServiceInvoker(Service service) throws RegistryException, MessageDeliverException {
+ AssertArgument.isNotNull(service, "service");
+ this.service = service;
String lbClass = Configuration.getLoadBalancerPolicy();
try {
Class c = ClassUtil.forName(lbClass, this.getClass());
@@ -120,6 +117,20 @@
throw new MessageDeliverException( iae.getMessage(), iae);
}
}
+
+ /**
+ * Public constructor.
+ *
+ * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ * @param serviceName The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
+ */
+ public ServiceInvoker(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
+ this(new Service(serviceCategory, serviceName));
+ }
/**
* Method to deliver *one* message, which allocates resources before sending the message and
@@ -127,8 +138,8 @@
*
* @param message Message to be send
* @return message (if the invo
- * @throws MessageDeliverException
- * @throws RegistryException
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
*/
public Message deliverOne (Message message) throws MessageDeliverException, RegistryException, FaultMessageException
{
@@ -160,12 +171,11 @@
* @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
* @return Returns the reply message if the message was delivered
* without error, otherwise an exception is thrown.
- * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ * @throws RegistryException Failed to lookup Service endpoint.
+ * @throws MessageDeliverException Failed to deliver message to endpoint.
*/
public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException, FaultMessageException {
- if (message == null)
- throw new IllegalArgumentException();
-
+ AssertArgument.isNotNull(message, "message");
syncPickupDeliveryTimeout.set(timeoutMillis);
return post(message, true);
}
@@ -177,9 +187,7 @@
* @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
*/
public void deliverAsync(Message message) throws MessageDeliverException {
- if (message == null)
- throw new IllegalArgumentException();
-
+ AssertArgument.isNotNull(message, "message");
// Not interested in a reply
try
@@ -193,6 +201,19 @@
}
/**
+ * Deliver a message to the Dead Letter Channel Service.
+ * @param message The message to be delivered to the dead letter chennel.
+ * @throws RegistryException Service endpoint lookup failure.
+ * @throws MessageDeliverException Message delivery failure.
+ */
+ public static synchronized void deliverToDeadLetterChannel(Message message) throws RegistryException, MessageDeliverException {
+ if(dlQueueInvoker == null) {
+ dlQueueInvoker = new ServiceInvoker(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+ }
+ dlQueueInvoker.deliverAsync(message);
+ }
+
+ /**
* Deliver the supplied message to the target service associated with this invoker instance.
*
* @param message The message to be delivered.
@@ -208,8 +229,8 @@
if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) {
loadServiceClusterInfo();
}
- Message replyMessage = null;
- EPR epr = null;
+ Message replyMessage;
+ EPR epr;
// Iterate over all the EPRs in the list until delivered
while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
replyMessage = attemptDelivery(message, epr, synchronous);
@@ -224,16 +245,24 @@
}
// Throw exception if delivery failed...
- throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "]. Check for errors.");
+ throw new MessageDeliverException("Failed to deliver message to Service [" + service + "]. Check for errors.");
}
/**
+ * Get the details of Service to which this invoker instance is delivering messages.
+ * @return The Service details.
+ */
+ public Service getService() {
+ return service;
+ }
+
+ /**
* Get the Service category name for the Service for which this instance is delivering messages.
*
* @return Service Category.
*/
public String getServiceCategory() {
- return serviceCategory;
+ return service.getCategory();
}
/**
@@ -242,7 +271,7 @@
* @return Service name.
*/
public String getServiceName() {
- return serviceName;
+ return service.getName();
}
/**
@@ -261,11 +290,11 @@
try {
courier = getCourier(epr);
} catch (CourierException e) {
- logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
+ logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
} catch (MalformedEPRException e) {
- logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
} catch (Throwable t) {
- logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
}
// Try delivering the message using the courier we just looked up....
@@ -278,7 +307,7 @@
replyToEPR = getReplyToAddress(epr);
if(replyToEPR == null) {
- logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
+ logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
return null;
}
message.getHeader().getCall().setReplyTo(replyToEPR);
@@ -296,12 +325,12 @@
} catch (FaultMessageException e) {
throw e;
} catch (CourierException e) {
- logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
} catch (MalformedEPRException e) {
// Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
- logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
+ logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
} catch (Throwable t) {
- logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
} finally {
// TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
CourierUtil.cleanCourier(courier);
@@ -316,7 +345,7 @@
*
* @param toEpr The to address.
* @return The replyTo address.
- * @throws ConfigurationException
+ * @throws ConfigurationException Unable to support synchronous reply on 'to' address.
*/
protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
// This method just allows us to override Courier lookup during unit testing.
@@ -346,12 +375,13 @@
* Loads the EPRs fresh from the Registry. Right now we will do this every minute
* until we can expect to get updates from the registry. For now this should work
* just fine.
+ * @throws MessageDeliverException Registry lookup failure.
*/
public void loadServiceClusterInfo() throws MessageDeliverException
{
try {
- List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
- serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
+ List<EPR> serviceEprs = RegistryUtil.getEprs(service.getCategory(), service.getName());
+ serviceClusterInfo = new ServiceClusterInfoImpl(service.getName(), serviceEprs);
expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
} catch (RegistryException e) {
throw new MessageDeliverException(e.getMessage(), e);
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -24,6 +24,8 @@
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
import org.apache.log4j.Logger;
import org.jboss.soa.esb.message.Properties;
@@ -92,9 +94,14 @@
{
log.debug( "Property name : " + key );
Object property = properties.getProperty( key );
- if ( property != null )
- toJMSMessage.setObjectProperty( key, property );
- }
+ if ( property != null) {
+ try {
+ toJMSMessage.setObjectProperty( key, property );
+ } catch(Exception e) {
+ log.error("Failed to set object property '" + key + "' to '" + property + " (" + property.getClass().getName() + ")': " + e.getMessage());
+ }
+ }
+ }
}
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -37,6 +37,7 @@
* Generic Message Router Interface.
*
* @author kurt.stam at redhat.com
+ * @deprecated Use {@link org.jboss.soa.esb.listeners.ServiceInvoker}.
*/
public abstract class MessageRouter {
private static Logger logger = Logger.getLogger(MessageRouter.class);
@@ -65,6 +66,7 @@
* the service name
* @param message -
* the message that needs routing and delivery
+ * @deprecated Use the {@link org.jboss.soa.esb.listeners.ServiceInvoker}
*/
@SuppressWarnings("unchecked")
public synchronized static void deliverMessage(String serviceCategory, String serviceName, Message message) throws MessageRouterException {
@@ -86,10 +88,10 @@
* Collection with the name of the destination services.
* @param message -
* the message that needs routing and delivery
+ * @deprecated Use the {@link org.jboss.soa.esb.listeners.MessageMulticaster}.
*/
@SuppressWarnings("unchecked")
- public synchronized static void deliverMessages(
- Collection<String[]> destinations, Message message) throws MessageRouterException {
+ public synchronized static void deliverMessages(Collection<String[]> destinations, Message message) throws MessageRouterException {
String uuId = UUID.randomUUID().toString();
int counter = 0;
Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java 2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb;
+
+import junit.framework.TestCase;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class ServiceUnitTest extends TestCase {
+
+ public void test_equals() {
+ List<Service> list = new ArrayList<Service>();
+
+ list.add(new Service("a", "b"));
+ list.add(new Service("c", "d"));
+ assertTrue(list.contains(new Service("a", "b")));
+ assertTrue(list.contains(new Service("c", "d")));
+ assertFalse(list.contains(new Service("e", "f")));
+ }
+
+ public void test_hashcode() {
+ Map<Service, String> map = new HashMap<Service, String>();
+
+ map.put(new Service("a", "b"), "1");
+ map.put(new Service("c", "d"), "2");
+ map.put(new Service("x", "y"), null);
+ assertEquals("1", map.get(new Service("a", "b")));
+ assertEquals("2", map.get(new Service("c", "d")));
+ assertEquals(null, map.get(new Service("e", "f")));
+ assertTrue(map.containsKey(new Service("x", "y")));
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
___________________________________________________________________
Name: svn:eol-style
+ native
More information about the jboss-svn-commits
mailing list