[jboss-svn-commits] JBL Code SVN: r13981 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb and 5 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Aug 3 06:18:29 EDT 2007


Author: tfennelly
Date: 2007-08-03 06:18:29 -0400 (Fri, 03 Aug 2007)
New Revision: 13981

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
   labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
Modified:
   labs/jbossesb/trunk/product/install/build.xml
   labs/jbossesb/trunk/product/install/readme.txt
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java
Log:
http://jira.jboss.com/jira/browse/JBESB-685

Deprecation of MessageRouter.  Replaced uses of this class with the MessageMulticaster class which basicaly allows you to multicast a message to multiple endpoints, caching the ServiceInvokers in the process.


Modified: labs/jbossesb/trunk/product/install/build.xml
===================================================================
--- labs/jbossesb/trunk/product/install/build.xml	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/install/build.xml	2007-08-03 10:18:29 UTC (rev 13981)
@@ -212,7 +212,11 @@
 		</copy>
 	</target>
 
-	<target name="deployIntros" depends="check.deploy.props, dependencies" description="Deploys JAXB Intros to the application server">
+	<target name="deployIntros">
+		<echo message="***** DEPRECATED: Sorry, this target has been deprecated.  Please run the 'patch-jbossws' target." />
+	</target>
+	
+	<target name="patch-jbossws" depends="check.deploy.props, dependencies" description="Deploys JAXB Intros to the application server">
 		<property name="jbossws.location" location="${deploy.dir}/jbossws.sar"/>
 		<property name="jbossws.beans.location" location="${jbossws.location}/jbossws.beans/META-INF/jboss-beans.xml"/>
 		<property name="jbossws.beans.tmp.location" location="${jbossws.location}/jbossws.beans/META-INF/jboss-beans.new.xml"/>
@@ -222,15 +226,17 @@
 		<available property="jbossws.exists" file="${jbossws.location}" type="dir"/>
 		<fail unless="jbossws.exists" message="Please install JBossWS into the Application Server"/>
 		
-		<available property="jaxb.intros.exist" file="${jbossws.location}/${jaxb.intros.jar}" type="file"/>
-		<fail if="jaxb.intros.exist" message="JAXB Intros appear to be install already"/>
-		
-		<xslt style="jaxb.xslt" in="${jbossws.beans.location}" out="${jbossws.beans.tmp.location}"/>
-		<move file="${jbossws.beans.tmp.location}" tofile="${jbossws.beans.location}"/>
-		
 		<copy todir="${jbossws.location}">
 			<fileset dir="${org.jboss.esb.dist}/extras/jaxbintros" includes="${jaxb.intros.jar}"/>
 			<fileset dir="${org.jboss.esb.dist.lib}/soap.esb" includes="${jbossesb.soap.jar}"/>
 		</copy>
+		
+		<available property="jaxb.intros.exist" file="${jbossws.location}/${jaxb.intros.jar}" type="file"/>
+		<antcall target="apply-jbossws-xslt" />
 	</target>
+
+	<target name="apply-jbossws-xslt" unless="jaxb.intros.exist">
+		<xslt style="jaxb.xslt" in="${jbossws.beans.location}" out="${jbossws.beans.tmp.location}"/>
+		<move file="${jbossws.beans.tmp.location}" tofile="${jbossws.beans.location}"/>
+	</target>
 </project>

Modified: labs/jbossesb/trunk/product/install/readme.txt
===================================================================
--- labs/jbossesb/trunk/product/install/readme.txt	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/install/readme.txt	2007-08-03 10:18:29 UTC (rev 13981)
@@ -23,4 +23,4 @@
 directory of the jbossesb installation.
  - Follow the 'Deployment into JBoss AS 4.2.0.GA' section above (if not
    already installed)
- - execute 'ant deployIntros'
+ - execute 'ant patch-jbossws'

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb;
+
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+
+/**
+ * Service.
+ * <p/>
+ * Simple immutable compound value object associating a Service "Category" with a
+ * Service "Name".
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class Service {
+
+    private String category;
+    private String name;
+
+    public Service(String category, String name) {
+        AssertArgument.isNotNullAndNotEmpty(category, "category");
+        AssertArgument.isNotNullAndNotEmpty(name, "name");
+        this.category = category.trim();
+        this.name = name.trim();
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public boolean equals(Object obj) {
+        if(obj instanceof Service) {
+            Service service = (Service) obj;
+            if(service.category.equals(category) && service.name.equals(name)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    public int hashCode() {
+        return (category + name).hashCode();
+    }
+
+    public String toString() {
+        return category + ":" + name;
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/Service.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/Aggregator.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -68,7 +68,9 @@
  */
 public class Aggregator extends AbstractActionPipelineProcessor
 {
+    public final static String AGGEGRATOR_TAG = "aggregatorTag";
     public final static String SPLITTER_TIME_STAMP = "splitterTimeStamp";
+    
     private ConcurrentHashMap<String,ConcurrentHashMap< String, Message > > _aggregatedMessageMap
         = new ConcurrentHashMap< String, ConcurrentHashMap< String, Message > >();
     private TimeoutChecker _timeoutChecker=null;
@@ -203,6 +205,11 @@
         serviceName          = config.getAttribute(ListenerTagNames.SERVICE_NAME_TAG);
         serviceCategoryName  = config.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG);
     }
+
+    public static void decorate(Message message) {
+
+    }
+
     /**
      * Aggregates the messages into 1 new message with an attachment for each message.
      * 

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/ContentBasedWiretap.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -22,21 +22,23 @@
 package org.jboss.soa.esb.actions;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.Service;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageMulticaster;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.message.mapping.ObjectMapper;
 import org.jboss.soa.esb.services.registry.Registry;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.services.registry.RegistryFactory;
-import org.jboss.soa.esb.services.routing.MessageRouter;
 import org.jboss.soa.esb.services.routing.MessageRouterException;
 import org.jboss.soa.esb.services.routing.cbr.ContentBasedRouterFactory;
 
@@ -47,6 +49,7 @@
  * @author kevin.conner at jboss.com
  */
 public class ContentBasedWiretap extends AbstractActionPipelineProcessor {
+
     public static final String ROUTE_TO_TAG = "route-to";
 
     public static final String OBJECT_PATH_TAG = "object-path";
@@ -66,9 +69,8 @@
     }
 
     public void initialise() {
-        if (_destinations.size() < 1) {
-            _logger
-                    .warn("Missing or empty destination list - This action class won't have any effect");
+        if (messageMulticaster.getRecipientCount() == 0) {
+            _logger.warn("Missing or empty destination list - This action class won't have any effect");
         }
     }
 
@@ -105,8 +107,13 @@
                         + _destinations.keySet()
                         + ". Please fix your configuration.");
 
-        MessageRouter.deliverMessage(MessageRouter.INTERNAL_SERVICE_CATEGORY,
-                MessageRouter.DEAD_LETTER_SERVICE_NAME, message);
+        try {
+            ServiceInvoker.deliverToDeadLetterChannel(message);
+        } catch (MessageDeliverException e) {
+            throw new MessageRouterException("Failed to deliver message to Dead Letter Channel.", e);
+        } catch (RegistryException e) {
+            throw new MessageRouterException("Failed to lookup Dead Letter Channel.", e);
+        }
     }
 
     protected final void routeMessage(Message message)
@@ -115,19 +122,26 @@
                 _messagePathList);
         List<String> destinations = _cbr.route(_ruleSet, _ruleLanguage,
                 _ruleReload, message, objectList);
-        Collection<String[]> outgoingDestinations = new ArrayList<String[]>();
+        List<Service> outgoingDestinations = new ArrayList<Service>();
         for (String destination : destinations) {
             if (_destinations.containsKey(destination)) {
                 outgoingDestinations.add(_destinations.get(destination));
             }
         }
         if (outgoingDestinations.size() > 0) {
-            MessageRouter.deliverMessages(outgoingDestinations, message);
+            try {
+                messageMulticaster.sendToSubset(message, outgoingDestinations);
+            } catch (RegistryException e) {
+                throw new MessageRouterException(e);
+            } catch (MessageDeliverException e) {
+                throw new MessageRouterException(e);
+            }
         } else {
             noDestinations();
 
-            if (destinations.size() > 0)
+            if (destinations.size() > 0) {
                 invalidRuleConfiguration(message, destinations);
+            }
         }
     }
 
@@ -163,7 +177,7 @@
             _cbrClass = DEFAULT_CBR_CLASS;
         }
 
-        _destinations = new HashMap<String, String[]>();
+        _destinations = new HashMap<String, Service>();
         ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
         if (destList != null) {
             for (ConfigTree curr : destList) {
@@ -174,8 +188,9 @@
                             ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
                     String name = curr
                             .getRequiredAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-                    _destinations.put(key, new String[]
-                            {category, name});
+                    Service service = new Service(category, name);
+                    _destinations.put(key, service);
+                    messageMulticaster.addRecipient(service);
                 }
                 catch (Exception e) {
                     throw new ConfigurationException(
@@ -202,8 +217,10 @@
 
     protected ConfigTree _config;
 
-    protected Map<String, String[]> _destinations;
+    protected Map<String, Service> _destinations;
 
+    protected MessageMulticaster messageMulticaster = new MessageMulticaster();
+
     protected String _cbrClass;
 
     protected String _ruleSet;

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/actions/StaticWiretap.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -28,17 +28,15 @@
  */
 package org.jboss.soa.esb.actions;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.Service;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.MessageMulticaster;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.services.routing.MessageRouter;
-import org.jboss.soa.esb.services.routing.MessageRouterException;
 
 public class StaticWiretap extends AbstractActionPipelineProcessor
 {
@@ -57,13 +55,13 @@
     {
         try
         {
-            MessageRouter.deliverMessages(_destinations, message);
+            messageMulticaster.sendToAll(message);
             return message;
+        } catch (RegistryException e) {
+            throw new ActionProcessingException(e);
+        } catch (MessageDeliverException e) {
+            throw new ActionProcessingException(e);
         }
-        catch (MessageRouterException ex)
-        {
-            throw new ActionProcessingException(ex);
-        }
     }
 
     /**
@@ -71,7 +69,6 @@
      */
     public void initialise() throws ActionLifecycleException
     {
-        _destinations = new ArrayList<String[]>();
         ConfigTree[] destList = _config.getChildren(ROUTE_TO_TAG);
         if (null == destList || destList.length < 1)
         {
@@ -84,7 +81,8 @@
             {
                 String category = curr.getAttribute(ListenerTagNames.SERVICE_CATEGORY_NAME_TAG, "");
                 String name = curr.getRequiredAttribute(ListenerTagNames.SERVICE_NAME_TAG);
-                _destinations.add(new String[] { category, name });
+                Service service = new Service(category, name);
+                messageMulticaster.addRecipient(service);
             }
             catch (Exception e)
             {
@@ -95,7 +93,7 @@
 
     protected ConfigTree _config;
 
-    protected List<String[]> _destinations;
+    protected MessageMulticaster messageMulticaster = new MessageMulticaster();
 
     protected static Logger _logger = Logger.getLogger(StaticRouter.class);
 }

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,144 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners;
+
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.Service;
+import org.jboss.soa.esb.actions.Aggregator;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.apache.log4j.Logger;
+
+import java.util.*;
+
+/**
+ * Message Multicaster.
+ * <p/>
+ * Used to send a message to a recipient list, or a subset of that recipient list.
+ * <p/>
+ * Caches a {@link ServiceInvoker} instance for each recipient.
+ *
+ * @author <a href="mailto:tom.fennely at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageMulticaster {
+
+    private static Logger logger = Logger.getLogger(MessageMulticaster.class);
+
+    private Map<Service, ServiceInvoker> invokers = new HashMap<Service, ServiceInvoker>();
+
+    /**
+     * Add a message recipient Service.
+     * @param service Recipient service for receipt of messages from this miltcaster instance.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
+     */
+    public void addRecipient(Service service) throws RegistryException, MessageDeliverException {
+        AssertArgument.isNotNull(service, "service");
+        // The ServiceInvoker will be lazilly created in the getInvoker method.
+        // This is effectively a registration of the service as being a potential recipient.
+        invokers.put(service, null);
+    }
+
+    /**
+     * Is the specified service a recipient of this multicaster instance.
+     * @param service The service to check for.
+     * @return True if the supplied service is one of the recipients, otherwise false.
+     */
+    public boolean isRecipient(Service service) {
+        return invokers.containsKey(service);
+    }
+
+    /**
+     * Get the number ot recipients associated with this multicaster instance.
+     * @return The number of recipients.
+     */
+    public int getRecipientCount() {
+        return invokers.size();
+    }
+
+    /**
+     * Send the message to all the recipients associated with this multicaster.
+     * @param message The message.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
+     */
+    public void sendToAll(Message message) throws RegistryException, MessageDeliverException {
+        sendToSubset(message, new ArrayList<Service>(invokers.keySet()));
+    }
+
+    /**
+     * Send the message to all the recipients associated with this multicaster and
+     * also listed in the supplied recipient list.
+     * <p/>
+     * The recipients supplied in the list must have been added through the {@link #addRecipient(org.jboss.soa.esb.Service)}
+     * method, otherwise the message will be delivered to the Dead Letter Channel.
+     *
+     * @param message The message.
+     * @param recipients The recipient subset to which the supplied message is to be sent.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
+     */
+    public void sendToSubset(Message message, List<Service> recipients) throws RegistryException, MessageDeliverException {
+        String uuId = UUID.randomUUID().toString();
+        ArrayList<String> aggregatorTags = new ArrayList<String>();
+        int recipientCount = recipients.size();
+        long timestamp = System.currentTimeMillis();
+
+        for(int i = 0; i < recipientCount; i++) {
+            Service recipient = recipients.get(i);
+            ServiceInvoker invoker = getInvoker(recipient);
+            String tag = uuId + ":" + (i + 1) + ":" + recipientCount + ":" + timestamp;
+
+            aggregatorTags.add(tag);
+            message.getProperties().setProperty(Aggregator.AGGEGRATOR_TAG, aggregatorTags);
+            if (logger.isDebugEnabled()) {
+                logger.debug(Aggregator. AGGEGRATOR_TAG + "=" + tag);
+            }
+
+            if(invoker == null) {
+                logger.error("Service '" + recipient + "' is not in recipient list.  Delivering message to Dead Letter Channel.");
+                ServiceInvoker.deliverToDeadLetterChannel(message);
+            } else {
+                try {
+                    invoker.deliverAsync(message);
+                } catch(MessageDeliverException e) {
+                    logger.error("Failed to deliver message to Service '" + recipient + "'.  Delivering message to Dead Letter Channel.");
+                    ServiceInvoker.deliverToDeadLetterChannel(message);
+                }
+            }
+        }
+    }
+
+    private ServiceInvoker getInvoker(Service recipient) throws RegistryException, MessageDeliverException {
+        ServiceInvoker invoker = invokers.get(recipient);
+
+        // We lazilly create the invokers...
+        if(invoker == null) {
+            if(!invokers.containsKey(recipient)) {
+                return null;
+            }
+            invoker = new ServiceInvoker(recipient);
+            invokers.put(recipient, invoker);
+        }
+
+        return invoker;
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageMulticaster.java
___________________________________________________________________
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -28,7 +28,7 @@
 import org.jboss.internal.soa.esb.assertion.AssertArgument;
 import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
 import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.Call;
+import org.jboss.soa.esb.Service;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.util.DefaultReplyTo;
@@ -39,7 +39,6 @@
 import org.jboss.soa.esb.couriers.CourierUtil;
 import org.jboss.soa.esb.couriers.FaultMessageException;
 import org.jboss.soa.esb.couriers.TwoWayCourier;
-import org.jboss.soa.esb.listeners.RegistryUtil;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
 import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
@@ -59,27 +58,24 @@
  */
 public class ServiceInvoker {
 
+    public static final String INTERNAL_SERVICE_CATEGORY = "JBossESB-Internal";
+
+    public static final String DEAD_LETTER_SERVICE_NAME = "DeadLetterService";
+
     /**
      * Class logger.
      */
     private static Logger logger = Logger.getLogger(ServiceInvoker.class);
-
     /**
-     * The <b>category name</b> of the Service to which this instance will
-     * deliver messages.
+     * The target service.
      */
-    private String serviceCategory;
+    private Service service;
     /**
-     * The <b>name</b> of the Service to which this instance will
-     * deliver messages.
+     * Load balancer. 
      */
-    private String serviceName;
-    /**
-     * 
-     */
     private LoadBalancePolicy loadBalancer;
     /**
-     * 
+     * Cluster info.
      */
     private ServiceClusterInfo serviceClusterInfo;
     /**
@@ -87,23 +83,24 @@
      */
     private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
     /**
-     * 
+     *
      */
     private Date expirationDate;
     /**
+     * Dead letter channel Service invoker.
+     */
+    private static ServiceInvoker dlQueueInvoker;
+
+    /**
      * Public constructor.
      *
-     * @param serviceCategory The <b>category name</b> of the Service to which this instance will
-     *                        deliver messages.
-     * @param serviceName     The <b>name</b> of the Service to which this instance will
-     *                        deliver messages.
-     * @throws RegistryException Failed to lookup EPRs for the specified Service.
+     * @param service The Service to which this instance will deliver messages.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
      */
-    public ServiceInvoker(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
-        AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
-        AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
-        this.serviceCategory = serviceCategory;
-        this.serviceName = serviceName;
+    public ServiceInvoker(Service service) throws RegistryException, MessageDeliverException {
+        AssertArgument.isNotNull(service, "service");
+        this.service = service;
         String lbClass = Configuration.getLoadBalancerPolicy();
         try {
             Class c = ClassUtil.forName(lbClass, this.getClass());
@@ -120,6 +117,20 @@
             throw new MessageDeliverException( iae.getMessage(), iae);
         }
     }
+
+    /**
+     * Public constructor.
+     *
+     * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+     *                        deliver messages.
+     * @param serviceName     The <b>name</b> of the Service to which this instance will
+     *                        deliver messages.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
+     */
+    public ServiceInvoker(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
+        this(new Service(serviceCategory, serviceName));
+    }
     
     /**
      * Method to deliver *one* message, which allocates resources before sending the message and
@@ -127,8 +138,8 @@
      * 
      * @param message Message to be send
      * @return message (if the invo
-     * @throws MessageDeliverException
-     * @throws RegistryException
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
      */
     public Message deliverOne (Message message) throws MessageDeliverException, RegistryException, FaultMessageException
     {
@@ -160,12 +171,11 @@
      * @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
      * @return Returns the reply message if the message was delivered
      *         without error, otherwise an exception is thrown.
-     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+     * @throws RegistryException Failed to lookup Service endpoint.
+     * @throws MessageDeliverException Failed to deliver message to endpoint.
      */
     public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException, FaultMessageException {
-    	if (message == null)
-    	    throw new IllegalArgumentException();
-    	
+        AssertArgument.isNotNull(message, "message");
         syncPickupDeliveryTimeout.set(timeoutMillis);
         return post(message, true);
     }
@@ -177,9 +187,7 @@
      * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
      */
     public void deliverAsync(Message message) throws MessageDeliverException {
-    	if (message == null)
-    	    throw new IllegalArgumentException();
-    	
+        AssertArgument.isNotNull(message, "message");
         // Not interested in a reply
     	
     	try
@@ -193,6 +201,19 @@
     }
 
     /**
+     * Deliver a message to the Dead Letter Channel Service.
+     * @param message The message to be delivered to the dead letter chennel.
+     * @throws RegistryException  Service endpoint lookup failure.
+     * @throws MessageDeliverException Message delivery failure.
+     */
+    public static synchronized void deliverToDeadLetterChannel(Message message) throws RegistryException, MessageDeliverException {
+        if(dlQueueInvoker == null) {
+            dlQueueInvoker = new ServiceInvoker(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+        }
+        dlQueueInvoker.deliverAsync(message);
+    }
+
+    /**
      * Deliver the supplied message to the target service associated with this invoker instance.
      *
      * @param message     The message to be delivered.
@@ -208,8 +229,8 @@
             if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) { 
                 loadServiceClusterInfo();
             }
-            Message replyMessage = null;
-            EPR epr = null;
+            Message replyMessage;
+            EPR epr;
             // Iterate over all the EPRs in the list until delivered
             while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
                 replyMessage = attemptDelivery(message, epr, synchronous);
@@ -224,16 +245,24 @@
         }
         
         // Throw exception if delivery failed...
-        throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "].  Check for errors.");
+        throw new MessageDeliverException("Failed to deliver message to Service [" + service + "].  Check for errors.");
     }
 
     /**
+     * Get the details of Service to which this invoker instance is delivering messages.
+     * @return The Service details.
+     */
+    public Service getService() {
+        return service;
+    }
+
+    /**
      * Get the Service category name for the Service for which this instance is delivering messages.
      *
      * @return Service Category.
      */
     public String getServiceCategory() {
-        return serviceCategory;
+        return service.getCategory();
     }
 
     /**
@@ -242,7 +271,7 @@
      * @return Service name.
      */
     public String getServiceName() {
-        return serviceName;
+        return service.getName();
     }
 
     /**
@@ -261,11 +290,11 @@
         try {
             courier = getCourier(epr);
         } catch (CourierException e) {
-            logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
+            logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + service + "].", e);
         } catch (MalformedEPRException e) {
-            logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+            logger.warn("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
         } catch (Throwable t) {
-            logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+            logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + service + "].", t);
         }
 
         // Try delivering the message using the courier we just looked up....
@@ -278,7 +307,7 @@
                 	replyToEPR = getReplyToAddress(epr);
                     
                     if(replyToEPR == null) {
-                        logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
+                        logger.debug("Not using epr [" + epr + "] for Service [" + service + "]. No reply-to address available for synchronous response.");
                         return null;
                     }
                     message.getHeader().getCall().setReplyTo(replyToEPR);
@@ -296,12 +325,12 @@
             } catch (FaultMessageException e) {
         	throw e;
             } catch (CourierException e) {
-                logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+                logger.debug("Badly formed EPR [" + epr + "] for Service [" + service + "]. " + e.getMessage());
             } catch (MalformedEPRException e) {
                 // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
-                logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
+                logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + service + "]. But the EPR has already been validated!!");
             } catch (Throwable t) {
-                logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+                logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + service + "].", t);
             } finally {
                 // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
                 CourierUtil.cleanCourier(courier);
@@ -316,7 +345,7 @@
      *
      * @param toEpr The to address.
      * @return The replyTo address.
-     * @throws ConfigurationException
+     * @throws ConfigurationException Unable to support synchronous reply on 'to' address.
      */
     protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
         // This method just allows us to override Courier lookup during unit testing.
@@ -346,12 +375,13 @@
      * Loads the EPRs fresh from the Registry. Right now we will do this every minute
      * until we can expect to get updates from the registry. For now this should work
      * just fine.
+     * @throws MessageDeliverException Registry lookup failure.
      */
     public void loadServiceClusterInfo() throws MessageDeliverException
     {
         try {
-            List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
-            serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
+            List<EPR> serviceEprs = RegistryUtil.getEprs(service.getCategory(), service.getName());
+            serviceClusterInfo = new ServiceClusterInfoImpl(service.getName(), serviceEprs);
             expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
         } catch (RegistryException e) {
             throw new MessageDeliverException(e.getMessage(), e);

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/notification/jms/DefaultJMSPropertiesSetter.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -24,6 +24,8 @@
 import java.net.URI;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageFormatException;
+
 import org.apache.log4j.Logger;
 import org.jboss.soa.esb.message.Properties;
 
@@ -92,9 +94,14 @@
 		{
 			log.debug( "Property name : " + key );
 			Object property = properties.getProperty( key );
-			if ( property != null )
-				toJMSMessage.setObjectProperty( key, property );
-		}
+			if ( property != null) {
+                try {
+                    toJMSMessage.setObjectProperty( key, property );
+                } catch(Exception e) {
+                    log.error("Failed to set object property '" + key + "' to '" + property + " (" + property.getClass().getName() + ")': " + e.getMessage());
+                }
+            }
+        }
 	}
 
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-08-03 09:08:50 UTC (rev 13980)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/services/routing/MessageRouter.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -37,6 +37,7 @@
  * Generic Message Router Interface.
  *
  * @author kurt.stam at redhat.com
+ * @deprecated Use {@link org.jboss.soa.esb.listeners.ServiceInvoker}.
  */
 public abstract class MessageRouter {
     private static Logger logger = Logger.getLogger(MessageRouter.class);
@@ -65,6 +66,7 @@
      *                        the service name
      * @param message         -
      *                        the message that needs routing and delivery
+     * @deprecated Use the {@link org.jboss.soa.esb.listeners.ServiceInvoker}
      */
     @SuppressWarnings("unchecked")
     public synchronized static void deliverMessage(String serviceCategory, String serviceName, Message message) throws MessageRouterException {
@@ -86,10 +88,10 @@
      *                     Collection with the name of the destination services.
      * @param message      -
      *                     the message that needs routing and delivery
+     * @deprecated Use the {@link org.jboss.soa.esb.listeners.MessageMulticaster}.
      */
     @SuppressWarnings("unchecked")
-    public synchronized static void deliverMessages(
-            Collection<String[]> destinations, Message message) throws MessageRouterException {
+    public synchronized static void deliverMessages(Collection<String[]> destinations, Message message) throws MessageRouterException {
         String uuId = UUID.randomUUID().toString();
         int counter = 0;
 

Added: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java	2007-08-03 10:18:29 UTC (rev 13981)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb;
+
+import junit.framework.TestCase;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class ServiceUnitTest extends TestCase {
+
+    public void test_equals() {
+        List<Service> list = new ArrayList<Service>();
+
+        list.add(new Service("a", "b"));
+        list.add(new Service("c", "d"));
+        assertTrue(list.contains(new Service("a", "b")));
+        assertTrue(list.contains(new Service("c", "d")));
+        assertFalse(list.contains(new Service("e", "f")));
+    }
+
+    public void test_hashcode() {
+        Map<Service, String> map = new HashMap<Service, String>();
+
+        map.put(new Service("a", "b"), "1");
+        map.put(new Service("c", "d"), "2");
+        map.put(new Service("x", "y"), null);
+        assertEquals("1", map.get(new Service("a", "b")));
+        assertEquals("2", map.get(new Service("c", "d")));
+        assertEquals(null, map.get(new Service("e", "f")));
+        assertTrue(map.containsKey(new Service("x", "y")));
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/tests/src/org/jboss/soa/esb/ServiceUnitTest.java
___________________________________________________________________
Name: svn:eol-style
   + native




More information about the jboss-svn-commits mailing list