[jboss-svn-commits] JBL Code SVN: r13306 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/listeners/jca and 2 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Tue Jul 10 13:55:06 EDT 2007


Author: tfennelly
Date: 2007-07-10 13:55:05 -0400 (Tue, 10 Jul 2007)
New Revision: 13306

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java
Removed:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
   labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java
Log:
part of JBESB-575

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -42,7 +42,7 @@
 import org.jboss.soa.esb.addressing.eprs.JMSEpr;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.message.MessageDeliverException;
-import org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.util.Util;

Copied: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java (from rev 13262, labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java)
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -0,0 +1,302 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA  02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
+import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
+import org.jboss.soa.esb.listeners.ha.ServiceClusterInfoImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.ClassUtil;
+
+/**
+ * Adapter class for managing {@link Message} delivery to a specified Service.
+ * <p/>
+ * Manages loading of {@link EPR EPRs}, {@link Courier} selection and
+ * message delivery. Provides a unified/simplified interface for message
+ * delivery.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageDeliveryAdapter {
+
+    /**
+     * Class logger.
+     */
+    private static Logger logger = Logger.getLogger(MessageDeliveryAdapter.class);
+
+    /**
+     * The <b>category name</b> of the Service to which this instance will
+     * deliver messages.
+     */
+    private String serviceCategory;
+    /**
+     * The <b>name</b> of the Service to which this instance will
+     * deliver messages.
+     */
+    private String serviceName;
+    /**
+     * 
+     */
+    private LoadBalancePolicy loadBalancer;
+    /**
+     * 
+     */
+    private ServiceClusterInfo serviceClusterInfo;
+    /**
+     * Synchronous courier "pickup" deliver timeout.
+     */
+    private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
+    /**
+     * 
+     */
+    private Date expirationDate;
+    /**
+     * Public constructor.
+     *
+     * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+     *                        deliver messages.
+     * @param serviceName     The <b>name</b> of the Service to which this instance will
+     *                        deliver messages.
+     * @throws RegistryException Failed to lookup EPRs for the specified Service.
+     */
+    public MessageDeliveryAdapter(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
+        AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
+        AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
+        this.serviceCategory = serviceCategory;
+        this.serviceName = serviceName;
+        String lbClass = Configuration.getLoadBalancerPolicy();
+        try {
+            Class c = ClassUtil.forName(lbClass, this.getClass());
+            loadBalancer = (LoadBalancePolicy) c.newInstance();
+            loadServiceClusterInfo();
+        } catch (ClassNotFoundException clf) {
+            logger.error("No such LoadBalancePolicy class = " + lbClass);
+            throw new MessageDeliverException( clf.getMessage(), clf);
+        } catch (InstantiationException ie) {
+            logger.error("Could not instatiate LoadBalancePolicy class = " + lbClass);
+            throw new MessageDeliverException( ie.getMessage(), ie.getCause());
+        } catch (IllegalAccessException iae) {
+            logger.error("Illegal access while instantiating LoadBalancePolicy class = " + lbClass);
+            throw new MessageDeliverException( iae.getMessage(), iae);
+        }
+    }
+
+    /**
+     * Synchronously deliver the supplied message to the target service associated with this adapter instance.
+     *
+     * @param message       The message to be delivered.
+     * @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
+     * @return Returns the reply message if the message was delivered
+     *         without error, otherwise an exception is thrown.
+     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+     */
+    public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException {
+        syncPickupDeliveryTimeout.set(timeoutMillis);
+        return deliver(message, true);
+    }
+
+    /**
+     * Asynchronously deliver the supplied message to the target service associated with this adapter instance.
+     *
+     * @param message The message to be delivered.
+     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+     */
+    public void deliverAsync(Message message) throws MessageDeliverException {
+        // Not interested in a reply
+        deliver(message, false);
+    }
+
+    /**
+     * Deliver the supplied message to the target service associated with this adapter instance.
+     *
+     * @param message     The message to be delivered.
+     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+     * @return Returns the message (or a reply message if synchronous) if the message was delivered
+     *         without error, otherwise an exception is thrown.
+     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+     */
+    private Message deliver(Message message, boolean synchronous) throws MessageDeliverException
+    {
+        int numberOfAttemps=0;
+        while (numberOfAttemps++ < 2) {
+            if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) { 
+                loadServiceClusterInfo();
+            }
+            Message replyMessage = null;
+            EPR epr = null;
+            // Iterate over all the EPRs in the list until delivered
+            while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
+                replyMessage = attemptDelivery(message, epr, synchronous);
+                if (replyMessage != null) {
+                    // We've delivered it, we're done!
+                    return replyMessage;
+                } else {
+                    logger.info("Dead EPR: " + epr);
+                    serviceClusterInfo.removeDeadEPR(epr);
+                }
+            }
+        }
+        
+        // Throw exception if delivery failed...
+        throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "].  Check for errors.");
+    }
+
+    /**
+     * Get the Service category name for the Service for which this instance is delivering messages.
+     *
+     * @return Service Category.
+     */
+    public String getServiceCategory() {
+        return serviceCategory;
+    }
+
+    /**
+     * Get the Service name for the Service for which this instance is delivering messages.
+     *
+     * @return Service name.
+     */
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    /**
+     * Attempt to deliver the supplied message using the supplied EPR.
+     *
+     * @param message     The message to be delivered.
+     * @param epr         The EPR to be used in the delivery attempt.
+     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+     * @return Returns the message (or a reply message if synchronous) if the message was delivered
+     *         without error, otherwise null.
+     */
+    private Message attemptDelivery(Message message, EPR epr, boolean synchronous) {
+        TwoWayCourier courier = null;
+
+        // Get a courier for the EPR...
+        try {
+            courier = getCourier(epr);
+        } catch (CourierException e) {
+            logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
+        } catch (MalformedEPRException e) {
+            logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+        } catch (Throwable t) {
+            logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+        }
+
+        // Try delivering the message using the courier we just looked up....
+        if (courier != null) {
+            try {
+                EPR replyToEPR = null;
+
+                if (synchronous) {
+                    replyToEPR = getReplyToAddress(epr);
+                    if(replyToEPR == null) {
+                        logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
+                        return null;
+                    }
+                    message.getHeader().getCall().setReplyTo(replyToEPR);
+                }
+                if (courier.deliver(message)) {
+                    if (replyToEPR != null) {
+                        courier.setReplyToEpr(replyToEPR);
+                        return courier.pickup(syncPickupDeliveryTimeout.get());
+                    } else {
+                        return message;
+                    }
+                }
+            } catch (CourierException e) {
+                logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+            } catch (MalformedEPRException e) {
+                // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
+                logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
+            } catch (Throwable t) {
+                logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+            } finally {
+                // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
+                CourierUtil.cleanCourier(courier);
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Get the reply to address for synchronous delivery.
+     *
+     * @param toEpr The to address.
+     * @return The replyTo address.
+     * @throws ConfigurationException
+     */
+    protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
+        // This method just allows us to override Courier lookup during unit testing.
+        try {
+            return CourierUtil.getDefaultReplyToEpr(toEpr);
+        } catch (CourierException e) {
+            throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
+        } catch (MalformedEPRException e) {
+            throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
+        }
+    }
+
+    /**
+     * Get a {@link org.jboss.soa.esb.couriers.Courier} for the supplied EPR.
+     *
+     * @param epr The EPR for which a {@link org.jboss.soa.esb.couriers.Courier}
+     *            is being sought.
+     * @return The courier for the EPR.
+     * @throws CourierException      A courier implementation cannot be created.
+     * @throws MalformedEPRException Bad EPR.
+     */
+    protected TwoWayCourier getCourier(EPR epr) throws CourierException, MalformedEPRException {
+        // This method just allows us to override Courier lookup during unit testing.
+        return CourierFactory.getInstance().getMessageCourier(epr);
+    }
+    /**
+     * Loads the EPRs fresh from the Registry. Right now we will do this every minute
+     * until we can expect to get updates from the registry. For now this should work
+     * just fine.
+     */
+    public void loadServiceClusterInfo() throws MessageDeliverException
+    {
+        try {
+            List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
+            serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
+            expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
+        } catch (RegistryException e) {
+            throw new MessageDeliverException(e.getMessage(), e);
+        }
+    }
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java
___________________________________________________________________
Name: svn:eol-style
   + native

Deleted: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -1,110 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.soa.esb.listeners;
-
-import java.util.Collection;
-
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-
-/**
- * At initialization, this object looks up target EPR for a given service/category.
- * The postMessage method invokes on a target EPR.
- *
- * This class is really for use with Gateways, but could be used by clients to pre-initialize a service lookup
- *
- * @author <a href="bill at jboss.com">Bill Burke</a>
- * @version $Revision: 1.1 $
- */
-public class ServiceInvoker
-{
-   private String name;
-   private String category;
-
-   private Collection<EPR> targetEprs;
-
-
-   public String getName()
-   {
-      return name;
-   }
-
-   public void setName(String name)
-   {
-      this.name = name;
-   }
-
-   public String getCategory()
-   {
-      return category;
-   }
-
-   public void setCategory(String category)
-   {
-      this.category = category;
-   }
-
-   public void start()
-   {
-      try
-      {
-          targetEprs = RegistryUtil.getEprs(category, name);
-      }
-      catch (final RegistryException re)
-      {
-          throw new RuntimeException("Unexpected registry exception", re) ;
-      }
-
-   }
-
-   public void postMessage(Message message) throws CourierException, MalformedEPRException
-   {
-      boolean bSent = false;
-      for (EPR current : targetEprs)
-      {
-         Courier courier = CourierFactory.getCourier(current);
-         try
-         {
-            if (courier.deliver(message))
-            {
-               bSent = true;
-               break;
-            }
-         }
-         finally
-         {
-            CourierUtil.cleanCourier(courier);
-         }
-      }
-      if (!bSent)
-      {
-         String text = "Target service <" + category + "," + name + "> is not registered";
-         throw new CourierException(text);
-      }
-   }
-}

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -21,7 +21,7 @@
 */
 package org.jboss.soa.esb.listeners.jca;
 
-import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
 
 /**
  * Required interface for JCA ESB Gateways
@@ -34,5 +34,5 @@
  */
 public interface InflowGateway
 {
-   public void setServiceInvoker(ServiceInvoker invoker);
+   public void setServiceInvoker(MessageDeliveryAdapter invoker);
 }

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -27,12 +27,15 @@
 import java.util.HashMap;
 
 import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.services.registry.RegistryException;
 import org.jboss.soa.esb.helpers.ConfigTree;
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.ListenerUtil;
-import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
 import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
 import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.apache.log4j.Logger;
 
 /**
  * @author <a href="bill at jboss.com">Bill Burke</a>
@@ -40,6 +43,7 @@
  */
 public class JcaInflowGateway extends AbstractThreadedManagedLifecycle
 {
+   private static Logger logger = Logger.getLogger(JcaInflowGateway.class);
    private static final long serialVersionUID = 1L;
    private HashMap<String, String> activationSpec = new HashMap<String, String>();
    private ClassLoader classLoader;
@@ -55,7 +59,6 @@
    public JcaInflowGateway(ConfigTree config) throws ConfigurationException
    {
       super(config);
-      //System.out.println("CONFIG: " + config);
 
       serviceCategory = ListenerUtil.obtainAtt(config,
             ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
@@ -222,13 +225,21 @@
 
    protected void doRun()
    {
-      ServiceInvoker invoker = new ServiceInvoker();
-      invoker.setCategory(this.serviceCategory);
-      invoker.setName(this.serviceName);
-      invoker.start();
-      bean.setServiceInvoker(invoker);
-      bridge.activate();
-
+      MessageDeliveryAdapter invoker;
+      try
+      {
+         invoker = new MessageDeliveryAdapter(serviceCategory, serviceName);
+         bean.setServiceInvoker(invoker);
+         bridge.activate();
+      }
+      catch (RegistryException e)
+      {
+         logger.error("Failed activate JCA Inflow Gateway. Service '" + serviceCategory + ":" + serviceName + "'.", e);
+      }
+      catch (MessageDeliverException e)
+      {
+          logger.error("Failed activate JCA Inflow Gateway. Service '" + serviceCategory + ":" + serviceName + "'.", e);
+      }
    }
 
 

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -24,8 +24,8 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-import org.jboss.soa.esb.listeners.ServiceInvoker;
 import org.jboss.soa.esb.listeners.gateway.PackageJmsMessageContents;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
 
 /**
  * comment
@@ -35,10 +35,10 @@
  */
 public class JmsEndpoint implements InflowGateway, MessageListener
 {
-   private ServiceInvoker service;
+   private MessageDeliveryAdapter service;
    private PackageJmsMessageContents transformer = new PackageJmsMessageContents();
 
-   public void setServiceInvoker(ServiceInvoker invoker)
+   public void setServiceInvoker(MessageDeliveryAdapter invoker)
    {
       this.service = invoker;
    }
@@ -48,7 +48,7 @@
       try
       {
          org.jboss.soa.esb.message.Message esbMessage = transformer.process(message);
-         service.postMessage(esbMessage);
+         service.deliverAsync(esbMessage);
       }
       catch (Exception e)
       {

Deleted: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -1,301 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA  02110-1301, USA.
- *
- * (C) 2005-2006, JBoss Inc.
- */
-package org.jboss.soa.esb.listeners.message;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.jboss.internal.soa.esb.assertion.AssertArgument;
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.common.Configuration;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
-import org.jboss.soa.esb.listeners.RegistryUtil;
-import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
-import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
-import org.jboss.soa.esb.listeners.ha.ServiceClusterInfoImpl;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.util.ClassUtil;
-
-/**
- * Adapter class for managing {@link Message} delivery to a specified Service.
- * <p/>
- * Manages loading of {@link EPR EPRs}, {@link Courier} selection and
- * message delivery. Provides a unified/simplified interface for message
- * delivery.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
-public class MessageDeliveryAdapter {
-
-    /**
-     * Class logger.
-     */
-    private static Logger logger = Logger.getLogger(MessageDeliveryAdapter.class);
-
-    /**
-     * The <b>category name</b> of the Service to which this instance will
-     * deliver messages.
-     */
-    private String serviceCategory;
-    /**
-     * The <b>name</b> of the Service to which this instance will
-     * deliver messages.
-     */
-    private String serviceName;
-    /**
-     * 
-     */
-    private LoadBalancePolicy loadBalancer;
-    /**
-     * 
-     */
-    private ServiceClusterInfo serviceClusterInfo;
-    /**
-     * Synchronous courier "pickup" deliver timeout.
-     */
-    private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
-    /**
-     * 
-     */
-    private Date expirationDate;
-    /**
-     * Public constructor.
-     *
-     * @param serviceCategory The <b>category name</b> of the Service to which this instance will
-     *                        deliver messages.
-     * @param serviceName     The <b>name</b> of the Service to which this instance will
-     *                        deliver messages.
-     * @throws RegistryException Failed to lookup EPRs for the specified Service.
-     */
-    public MessageDeliveryAdapter(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
-        AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
-        AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
-        this.serviceCategory = serviceCategory;
-        this.serviceName = serviceName;
-        String lbClass = Configuration.getLoadBalancerPolicy();
-        try {
-            Class c = ClassUtil.forName(lbClass, this.getClass());
-            loadBalancer = (LoadBalancePolicy) c.newInstance();
-            loadServiceClusterInfo();
-        } catch (ClassNotFoundException clf) {
-            logger.error("No such LoadBalancePolicy class = " + lbClass);
-            throw new MessageDeliverException( clf.getMessage(), clf);
-        } catch (InstantiationException ie) {
-            logger.error("Could not instatiate LoadBalancePolicy class = " + lbClass);
-            throw new MessageDeliverException( ie.getMessage(), ie.getCause());
-        } catch (IllegalAccessException iae) {
-            logger.error("Illegal access while instantiating LoadBalancePolicy class = " + lbClass);
-            throw new MessageDeliverException( iae.getMessage(), iae);
-        }
-    }
-
-    /**
-     * Synchronously deliver the supplied message to the target service associated with this adapter instance.
-     *
-     * @param message       The message to be delivered.
-     * @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
-     * @return Returns the reply message if the message was delivered
-     *         without error, otherwise an exception is thrown.
-     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
-     */
-    public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException {
-        syncPickupDeliveryTimeout.set(timeoutMillis);
-        return deliver(message, true);
-    }
-
-    /**
-     * Asynchronously deliver the supplied message to the target service associated with this adapter instance.
-     *
-     * @param message The message to be delivered.
-     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
-     */
-    public void deliverAsync(Message message) throws MessageDeliverException {
-        // Not interested in a reply
-        deliver(message, false);
-    }
-
-    /**
-     * Deliver the supplied message to the target service associated with this adapter instance.
-     *
-     * @param message     The message to be delivered.
-     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
-     * @return Returns the message (or a reply message if synchronous) if the message was delivered
-     *         without error, otherwise an exception is thrown.
-     * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
-     */
-    private Message deliver(Message message, boolean synchronous) throws MessageDeliverException
-    {
-        int numberOfAttemps=0;
-        while (numberOfAttemps++ < 2) {
-            if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) { 
-                loadServiceClusterInfo();
-            }
-            Message replyMessage = null;
-            EPR epr = null;
-            // Iterate over all the EPRs in the list until delivered
-            while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
-                replyMessage = attemptDelivery(message, epr, synchronous);
-                if (replyMessage != null) {
-                    // We've delivered it, we're done!
-                    return replyMessage;
-                } else {
-                    logger.info("Dead EPR: " + epr);
-                    serviceClusterInfo.removeDeadEPR(epr);
-                }
-            }
-        }
-        
-        // Throw exception if delivery failed...
-        throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "].  Check for errors.");
-    }
-
-    /**
-     * Get the Service category name for the Service for which this instance is delivering messages.
-     *
-     * @return Service Category.
-     */
-    public String getServiceCategory() {
-        return serviceCategory;
-    }
-
-    /**
-     * Get the Service name for the Service for which this instance is delivering messages.
-     *
-     * @return Service name.
-     */
-    public String getServiceName() {
-        return serviceName;
-    }
-
-    /**
-     * Attempt to deliver the supplied message using the supplied EPR.
-     *
-     * @param message     The message to be delivered.
-     * @param epr         The EPR to be used in the delivery attempt.
-     * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
-     * @return Returns the message (or a reply message if synchronous) if the message was delivered
-     *         without error, otherwise null.
-     */
-    private Message attemptDelivery(Message message, EPR epr, boolean synchronous) {
-        TwoWayCourier courier = null;
-
-        // Get a courier for the EPR...
-        try {
-            courier = getCourier(epr);
-        } catch (CourierException e) {
-            logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
-        } catch (MalformedEPRException e) {
-            logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
-        } catch (Throwable t) {
-            logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
-        }
-
-        // Try delivering the message using the courier we just looked up....
-        if (courier != null) {
-            try {
-                EPR replyToEPR = null;
-
-                if (synchronous) {
-                    replyToEPR = getReplyToAddress(epr);
-                    if(replyToEPR == null) {
-                        logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
-                        return null;
-                    }
-                    message.getHeader().getCall().setReplyTo(replyToEPR);
-                }
-                if (courier.deliver(message)) {
-                    if (replyToEPR != null) {
-                        courier.setReplyToEpr(replyToEPR);
-                        return courier.pickup(syncPickupDeliveryTimeout.get());
-                    } else {
-                        return message;
-                    }
-                }
-            } catch (CourierException e) {
-                logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
-            } catch (MalformedEPRException e) {
-                // Hmmmm???... Can this really happen?  The Courier has already been created.  Haven't we already validated the EPR during the Courier lookup (above)??
-                logger.warn("Unexpected error.  Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
-            } catch (Throwable t) {
-                logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
-            } finally {
-                // TODO: So does this mean that Couriers are stateful?  If so, do we need to synchronize on using them??
-                CourierUtil.cleanCourier(courier);
-            }
-        }
-
-        return null;
-    }
-
-    /**
-     * Get the reply to address for synchronous delivery.
-     *
-     * @param toEpr The to address.
-     * @return The replyTo address.
-     * @throws ConfigurationException
-     */
-    protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
-        // This method just allows us to override Courier lookup during unit testing.
-        try {
-            return CourierUtil.getDefaultReplyToEpr(toEpr);
-        } catch (CourierException e) {
-            throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
-        } catch (MalformedEPRException e) {
-            throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
-        }
-    }
-
-    /**
-     * Get a {@link org.jboss.soa.esb.couriers.Courier} for the supplied EPR.
-     *
-     * @param epr The EPR for which a {@link org.jboss.soa.esb.couriers.Courier}
-     *            is being sought.
-     * @return The courier for the EPR.
-     * @throws CourierException      A courier implementation cannot be created.
-     * @throws MalformedEPRException Bad EPR.
-     */
-    protected TwoWayCourier getCourier(EPR epr) throws CourierException, MalformedEPRException {
-        // This method just allows us to override Courier lookup during unit testing.
-        return CourierFactory.getInstance().getMessageCourier(epr);
-    }
-    /**
-     * Loads the EPRs fresh from the Registry. Right now we will do this every minute
-     * until we can expect to get updates from the registry. For now this should work
-     * just fine.
-     */
-    public void loadServiceClusterInfo() throws MessageDeliverException
-    {
-        try {
-            List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
-            serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
-            expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
-        } catch (RegistryException e) {
-            throw new MessageDeliverException(e.getMessage(), e);
-        }
-    }
-}

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -22,7 +22,8 @@
 import org.jboss.internal.soa.esb.assertion.AssertArgument;
 import org.jboss.soa.esb.ConfigurationException;
 import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.*;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
 import org.jboss.soa.esb.message.Message;
 import org.jboss.soa.esb.services.registry.RegistryException;
 
@@ -33,7 +34,7 @@
  * instance.
  *
  * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @see org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter
+ * @see org.jboss.soa.esb.listeners.MessageDeliveryAdapter
  */
 public class UncomposedMessageDeliveryAdapter {
 
@@ -62,7 +63,7 @@
         AssertArgument.isNotNull(serviceCategory, "serviceCategory");
         AssertArgument.isNotNull(serviceName, "serviceName");
         AssertArgument.isNotNull(composer, "composer");
-        messageDeliveryAdapter = new MessageDeliveryAdapter(serviceCategory, serviceName);
+        messageDeliveryAdapter = new org.jboss.soa.esb.listeners.MessageDeliveryAdapter(serviceCategory, serviceName);
         this.composer = composer;
     }
 

Modified: labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java	2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java	2007-07-10 17:55:05 UTC (rev 13306)
@@ -27,17 +27,10 @@
 import javax.jws.soap.SOAPBinding;
 
 // For ESB Interaction
-import org.jboss.soa.esb.services.registry.Registry; // jbossesb-services.jar
-import org.jboss.soa.esb.services.registry.RegistryFactory; // jbossesb-services.jar
 import org.jboss.soa.esb.message.Message; // jbossesb-rosetta.jar
 import org.jboss.soa.esb.message.format.MessageFactory; // jbossesb-rosetta.jar
 import org.jboss.soa.esb.message.format.MessageType; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.addressing.EPR; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.TwoWayCourier; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.CourierFactory; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.CourierUtil; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter; // jbossesb-rosetta.jar
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter; // jbossesb-rosetta.jar
 
 @WebService(name = "HelloWorld", targetNamespace = "http://webservice_war1/helloworld")
 @SOAPBinding(style = SOAPBinding.Style.RPC)
@@ -52,7 +45,7 @@
             Message replyMessage = null;
 
             // Create the delivery adapter for the target service (you'd normally cache this!!)...
-            deliveryAdapter = new MessageDeliveryAdapter("MyServiceCategory", "MyService");
+            deliveryAdapter = new org.jboss.soa.esb.listeners.MessageDeliveryAdapter("MyServiceCategory", "MyService");
             // Create and populate the request message...
             requestMessage = MessageFactory.getInstance().getMessage(MessageType.JBOSS_XML);
             requestMessage.getBody().setByteArray(toWhom.getBytes()); // inject the value from the WS client




More information about the jboss-svn-commits mailing list