[jboss-svn-commits] JBL Code SVN: r13306 - in labs/jbossesb/trunk/product: rosetta/src/org/jboss/soa/esb/listeners/jca and 2 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Tue Jul 10 13:55:06 EDT 2007
Author: tfennelly
Date: 2007-07-10 13:55:05 -0400 (Tue, 10 Jul 2007)
New Revision: 13306
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java
Removed:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java
Log:
part of JBESB-575
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerUtil.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -42,7 +42,7 @@
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
-import org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.util.Util;
Copied: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java (from rev 13262, labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java)
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -0,0 +1,302 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and others contributors as indicated
+ * by the @authors tag. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ *
+ * (C) 2005-2006, JBoss Inc.
+ */
+package org.jboss.soa.esb.listeners;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.addressing.EPR;
+import org.jboss.soa.esb.addressing.MalformedEPRException;
+import org.jboss.soa.esb.common.Configuration;
+import org.jboss.soa.esb.couriers.Courier;
+import org.jboss.soa.esb.couriers.CourierException;
+import org.jboss.soa.esb.couriers.CourierFactory;
+import org.jboss.soa.esb.couriers.CourierUtil;
+import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.listeners.RegistryUtil;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
+import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
+import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
+import org.jboss.soa.esb.listeners.ha.ServiceClusterInfoImpl;
+import org.jboss.soa.esb.message.Message;
+import org.jboss.soa.esb.services.registry.RegistryException;
+import org.jboss.soa.esb.util.ClassUtil;
+
+/**
+ * Adapter class for managing {@link Message} delivery to a specified Service.
+ * <p/>
+ * Manages loading of {@link EPR EPRs}, {@link Courier} selection and
+ * message delivery. Provides a unified/simplified interface for message
+ * delivery.
+ *
+ * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
+ */
+public class MessageDeliveryAdapter {
+
+ /**
+ * Class logger.
+ */
+ private static Logger logger = Logger.getLogger(MessageDeliveryAdapter.class);
+
+ /**
+ * The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ */
+ private String serviceCategory;
+ /**
+ * The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ */
+ private String serviceName;
+ /**
+ *
+ */
+ private LoadBalancePolicy loadBalancer;
+ /**
+ *
+ */
+ private ServiceClusterInfo serviceClusterInfo;
+ /**
+ * Synchronous courier "pickup" deliver timeout.
+ */
+ private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
+ /**
+ *
+ */
+ private Date expirationDate;
+ /**
+ * Public constructor.
+ *
+ * @param serviceCategory The <b>category name</b> of the Service to which this instance will
+ * deliver messages.
+ * @param serviceName The <b>name</b> of the Service to which this instance will
+ * deliver messages.
+ * @throws RegistryException Failed to lookup EPRs for the specified Service.
+ */
+ public MessageDeliveryAdapter(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
+ AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
+ AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
+ this.serviceCategory = serviceCategory;
+ this.serviceName = serviceName;
+ String lbClass = Configuration.getLoadBalancerPolicy();
+ try {
+ Class c = ClassUtil.forName(lbClass, this.getClass());
+ loadBalancer = (LoadBalancePolicy) c.newInstance();
+ loadServiceClusterInfo();
+ } catch (ClassNotFoundException clf) {
+ logger.error("No such LoadBalancePolicy class = " + lbClass);
+ throw new MessageDeliverException( clf.getMessage(), clf);
+ } catch (InstantiationException ie) {
+ logger.error("Could not instatiate LoadBalancePolicy class = " + lbClass);
+ throw new MessageDeliverException( ie.getMessage(), ie.getCause());
+ } catch (IllegalAccessException iae) {
+ logger.error("Illegal access while instantiating LoadBalancePolicy class = " + lbClass);
+ throw new MessageDeliverException( iae.getMessage(), iae);
+ }
+ }
+
+ /**
+ * Synchronously deliver the supplied message to the target service associated with this adapter instance.
+ *
+ * @param message The message to be delivered.
+ * @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
+ * @return Returns the reply message if the message was delivered
+ * without error, otherwise an exception is thrown.
+ * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ */
+ public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException {
+ syncPickupDeliveryTimeout.set(timeoutMillis);
+ return deliver(message, true);
+ }
+
+ /**
+ * Asynchronously deliver the supplied message to the target service associated with this adapter instance.
+ *
+ * @param message The message to be delivered.
+ * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ */
+ public void deliverAsync(Message message) throws MessageDeliverException {
+ // Not interested in a reply
+ deliver(message, false);
+ }
+
+ /**
+ * Deliver the supplied message to the target service associated with this adapter instance.
+ *
+ * @param message The message to be delivered.
+ * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+ * @return Returns the message (or a reply message if synchronous) if the message was delivered
+ * without error, otherwise an exception is thrown.
+ * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
+ */
+ private Message deliver(Message message, boolean synchronous) throws MessageDeliverException
+ {
+ int numberOfAttemps=0;
+ while (numberOfAttemps++ < 2) {
+ if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) {
+ loadServiceClusterInfo();
+ }
+ Message replyMessage = null;
+ EPR epr = null;
+ // Iterate over all the EPRs in the list until delivered
+ while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
+ replyMessage = attemptDelivery(message, epr, synchronous);
+ if (replyMessage != null) {
+ // We've delivered it, we're done!
+ return replyMessage;
+ } else {
+ logger.info("Dead EPR: " + epr);
+ serviceClusterInfo.removeDeadEPR(epr);
+ }
+ }
+ }
+
+ // Throw exception if delivery failed...
+ throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "]. Check for errors.");
+ }
+
+ /**
+ * Get the Service category name for the Service for which this instance is delivering messages.
+ *
+ * @return Service Category.
+ */
+ public String getServiceCategory() {
+ return serviceCategory;
+ }
+
+ /**
+ * Get the Service name for the Service for which this instance is delivering messages.
+ *
+ * @return Service name.
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * Attempt to deliver the supplied message using the supplied EPR.
+ *
+ * @param message The message to be delivered.
+ * @param epr The EPR to be used in the delivery attempt.
+ * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
+ * @return Returns the message (or a reply message if synchronous) if the message was delivered
+ * without error, otherwise null.
+ */
+ private Message attemptDelivery(Message message, EPR epr, boolean synchronous) {
+ TwoWayCourier courier = null;
+
+ // Get a courier for the EPR...
+ try {
+ courier = getCourier(epr);
+ } catch (CourierException e) {
+ logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
+ } catch (MalformedEPRException e) {
+ logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ }
+
+ // Try delivering the message using the courier we just looked up....
+ if (courier != null) {
+ try {
+ EPR replyToEPR = null;
+
+ if (synchronous) {
+ replyToEPR = getReplyToAddress(epr);
+ if(replyToEPR == null) {
+ logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
+ return null;
+ }
+ message.getHeader().getCall().setReplyTo(replyToEPR);
+ }
+ if (courier.deliver(message)) {
+ if (replyToEPR != null) {
+ courier.setReplyToEpr(replyToEPR);
+ return courier.pickup(syncPickupDeliveryTimeout.get());
+ } else {
+ return message;
+ }
+ }
+ } catch (CourierException e) {
+ logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
+ } catch (MalformedEPRException e) {
+ // Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
+ logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
+ } catch (Throwable t) {
+ logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
+ } finally {
+ // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
+ CourierUtil.cleanCourier(courier);
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Get the reply to address for synchronous delivery.
+ *
+ * @param toEpr The to address.
+ * @return The replyTo address.
+ * @throws ConfigurationException
+ */
+ protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
+ // This method just allows us to override Courier lookup during unit testing.
+ try {
+ return CourierUtil.getDefaultReplyToEpr(toEpr);
+ } catch (CourierException e) {
+ throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
+ } catch (MalformedEPRException e) {
+ throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
+ }
+ }
+
+ /**
+ * Get a {@link org.jboss.soa.esb.couriers.Courier} for the supplied EPR.
+ *
+ * @param epr The EPR for which a {@link org.jboss.soa.esb.couriers.Courier}
+ * is being sought.
+ * @return The courier for the EPR.
+ * @throws CourierException A courier implementation cannot be created.
+ * @throws MalformedEPRException Bad EPR.
+ */
+ protected TwoWayCourier getCourier(EPR epr) throws CourierException, MalformedEPRException {
+ // This method just allows us to override Courier lookup during unit testing.
+ return CourierFactory.getInstance().getMessageCourier(epr);
+ }
+ /**
+ * Loads the EPRs fresh from the Registry. Right now we will do this every minute
+ * until we can expect to get updates from the registry. For now this should work
+ * just fine.
+ */
+ public void loadServiceClusterInfo() throws MessageDeliverException
+ {
+ try {
+ List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
+ serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
+ expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
+ } catch (RegistryException e) {
+ throw new MessageDeliverException(e.getMessage(), e);
+ }
+ }
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/MessageDeliveryAdapter.java
___________________________________________________________________
Name: svn:eol-style
+ native
Deleted: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/ServiceInvoker.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -1,110 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2006, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-*/
-package org.jboss.soa.esb.listeners;
-
-import java.util.Collection;
-
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-
-/**
- * At initialization, this object looks up target EPR for a given service/category.
- * The postMessage method invokes on a target EPR.
- *
- * This class is really for use with Gateways, but could be used by clients to pre-initialize a service lookup
- *
- * @author <a href="bill at jboss.com">Bill Burke</a>
- * @version $Revision: 1.1 $
- */
-public class ServiceInvoker
-{
- private String name;
- private String category;
-
- private Collection<EPR> targetEprs;
-
-
- public String getName()
- {
- return name;
- }
-
- public void setName(String name)
- {
- this.name = name;
- }
-
- public String getCategory()
- {
- return category;
- }
-
- public void setCategory(String category)
- {
- this.category = category;
- }
-
- public void start()
- {
- try
- {
- targetEprs = RegistryUtil.getEprs(category, name);
- }
- catch (final RegistryException re)
- {
- throw new RuntimeException("Unexpected registry exception", re) ;
- }
-
- }
-
- public void postMessage(Message message) throws CourierException, MalformedEPRException
- {
- boolean bSent = false;
- for (EPR current : targetEprs)
- {
- Courier courier = CourierFactory.getCourier(current);
- try
- {
- if (courier.deliver(message))
- {
- bSent = true;
- break;
- }
- }
- finally
- {
- CourierUtil.cleanCourier(courier);
- }
- }
- if (!bSent)
- {
- String text = "Target service <" + category + "," + name + "> is not registered";
- throw new CourierException(text);
- }
- }
-}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/InflowGateway.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -21,7 +21,7 @@
*/
package org.jboss.soa.esb.listeners.jca;
-import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
/**
* Required interface for JCA ESB Gateways
@@ -34,5 +34,5 @@
*/
public interface InflowGateway
{
- public void setServiceInvoker(ServiceInvoker invoker);
+ public void setServiceInvoker(MessageDeliveryAdapter invoker);
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JcaInflowGateway.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -27,12 +27,15 @@
import java.util.HashMap;
import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.services.registry.RegistryException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.ListenerUtil;
-import org.jboss.soa.esb.listeners.ServiceInvoker;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.listeners.lifecycle.AbstractThreadedManagedLifecycle;
import org.jboss.soa.esb.listeners.lifecycle.ManagedLifecycleException;
+import org.apache.log4j.Logger;
/**
* @author <a href="bill at jboss.com">Bill Burke</a>
@@ -40,6 +43,7 @@
*/
public class JcaInflowGateway extends AbstractThreadedManagedLifecycle
{
+ private static Logger logger = Logger.getLogger(JcaInflowGateway.class);
private static final long serialVersionUID = 1L;
private HashMap<String, String> activationSpec = new HashMap<String, String>();
private ClassLoader classLoader;
@@ -55,7 +59,6 @@
public JcaInflowGateway(ConfigTree config) throws ConfigurationException
{
super(config);
- //System.out.println("CONFIG: " + config);
serviceCategory = ListenerUtil.obtainAtt(config,
ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
@@ -222,13 +225,21 @@
protected void doRun()
{
- ServiceInvoker invoker = new ServiceInvoker();
- invoker.setCategory(this.serviceCategory);
- invoker.setName(this.serviceName);
- invoker.start();
- bean.setServiceInvoker(invoker);
- bridge.activate();
-
+ MessageDeliveryAdapter invoker;
+ try
+ {
+ invoker = new MessageDeliveryAdapter(serviceCategory, serviceName);
+ bean.setServiceInvoker(invoker);
+ bridge.activate();
+ }
+ catch (RegistryException e)
+ {
+ logger.error("Failed activate JCA Inflow Gateway. Service '" + serviceCategory + ":" + serviceName + "'.", e);
+ }
+ catch (MessageDeliverException e)
+ {
+ logger.error("Failed activate JCA Inflow Gateway. Service '" + serviceCategory + ":" + serviceName + "'.", e);
+ }
}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/jca/JmsEndpoint.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -24,8 +24,8 @@
import javax.jms.Message;
import javax.jms.MessageListener;
-import org.jboss.soa.esb.listeners.ServiceInvoker;
import org.jboss.soa.esb.listeners.gateway.PackageJmsMessageContents;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
/**
* comment
@@ -35,10 +35,10 @@
*/
public class JmsEndpoint implements InflowGateway, MessageListener
{
- private ServiceInvoker service;
+ private MessageDeliveryAdapter service;
private PackageJmsMessageContents transformer = new PackageJmsMessageContents();
- public void setServiceInvoker(ServiceInvoker invoker)
+ public void setServiceInvoker(MessageDeliveryAdapter invoker)
{
this.service = invoker;
}
@@ -48,7 +48,7 @@
try
{
org.jboss.soa.esb.message.Message esbMessage = transformer.process(message);
- service.postMessage(esbMessage);
+ service.deliverAsync(esbMessage);
}
catch (Exception e)
{
Deleted: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapter.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -1,301 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2006, JBoss Inc., and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA 02110-1301, USA.
- *
- * (C) 2005-2006, JBoss Inc.
- */
-package org.jboss.soa.esb.listeners.message;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.log4j.Logger;
-import org.jboss.internal.soa.esb.assertion.AssertArgument;
-import org.jboss.soa.esb.ConfigurationException;
-import org.jboss.soa.esb.addressing.EPR;
-import org.jboss.soa.esb.addressing.MalformedEPRException;
-import org.jboss.soa.esb.common.Configuration;
-import org.jboss.soa.esb.couriers.Courier;
-import org.jboss.soa.esb.couriers.CourierException;
-import org.jboss.soa.esb.couriers.CourierFactory;
-import org.jboss.soa.esb.couriers.CourierUtil;
-import org.jboss.soa.esb.couriers.TwoWayCourier;
-import org.jboss.soa.esb.listeners.RegistryUtil;
-import org.jboss.soa.esb.listeners.ha.LoadBalancePolicy;
-import org.jboss.soa.esb.listeners.ha.ServiceClusterInfo;
-import org.jboss.soa.esb.listeners.ha.ServiceClusterInfoImpl;
-import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.services.registry.RegistryException;
-import org.jboss.soa.esb.util.ClassUtil;
-
-/**
- * Adapter class for managing {@link Message} delivery to a specified Service.
- * <p/>
- * Manages loading of {@link EPR EPRs}, {@link Courier} selection and
- * message delivery. Provides a unified/simplified interface for message
- * delivery.
- *
- * @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- */
-public class MessageDeliveryAdapter {
-
- /**
- * Class logger.
- */
- private static Logger logger = Logger.getLogger(MessageDeliveryAdapter.class);
-
- /**
- * The <b>category name</b> of the Service to which this instance will
- * deliver messages.
- */
- private String serviceCategory;
- /**
- * The <b>name</b> of the Service to which this instance will
- * deliver messages.
- */
- private String serviceName;
- /**
- *
- */
- private LoadBalancePolicy loadBalancer;
- /**
- *
- */
- private ServiceClusterInfo serviceClusterInfo;
- /**
- * Synchronous courier "pickup" deliver timeout.
- */
- private ThreadLocal<Long> syncPickupDeliveryTimeout = new ThreadLocal<Long>();
- /**
- *
- */
- private Date expirationDate;
- /**
- * Public constructor.
- *
- * @param serviceCategory The <b>category name</b> of the Service to which this instance will
- * deliver messages.
- * @param serviceName The <b>name</b> of the Service to which this instance will
- * deliver messages.
- * @throws RegistryException Failed to lookup EPRs for the specified Service.
- */
- public MessageDeliveryAdapter(String serviceCategory, String serviceName) throws RegistryException, MessageDeliverException {
- AssertArgument.isNotNullAndNotEmpty(serviceCategory, "serviceCategory");
- AssertArgument.isNotNullAndNotEmpty(serviceName, "serviceName");
- this.serviceCategory = serviceCategory;
- this.serviceName = serviceName;
- String lbClass = Configuration.getLoadBalancerPolicy();
- try {
- Class c = ClassUtil.forName(lbClass, this.getClass());
- loadBalancer = (LoadBalancePolicy) c.newInstance();
- loadServiceClusterInfo();
- } catch (ClassNotFoundException clf) {
- logger.error("No such LoadBalancePolicy class = " + lbClass);
- throw new MessageDeliverException( clf.getMessage(), clf);
- } catch (InstantiationException ie) {
- logger.error("Could not instatiate LoadBalancePolicy class = " + lbClass);
- throw new MessageDeliverException( ie.getMessage(), ie.getCause());
- } catch (IllegalAccessException iae) {
- logger.error("Illegal access while instantiating LoadBalancePolicy class = " + lbClass);
- throw new MessageDeliverException( iae.getMessage(), iae);
- }
- }
-
- /**
- * Synchronously deliver the supplied message to the target service associated with this adapter instance.
- *
- * @param message The message to be delivered.
- * @param timeoutMillis Number of milliseconds before synchronous reply pickup should timeout.
- * @return Returns the reply message if the message was delivered
- * without error, otherwise an exception is thrown.
- * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
- */
- public Message deliverSync(Message message, long timeoutMillis) throws MessageDeliverException, RegistryException {
- syncPickupDeliveryTimeout.set(timeoutMillis);
- return deliver(message, true);
- }
-
- /**
- * Asynchronously deliver the supplied message to the target service associated with this adapter instance.
- *
- * @param message The message to be delivered.
- * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
- */
- public void deliverAsync(Message message) throws MessageDeliverException {
- // Not interested in a reply
- deliver(message, false);
- }
-
- /**
- * Deliver the supplied message to the target service associated with this adapter instance.
- *
- * @param message The message to be delivered.
- * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
- * @return Returns the message (or a reply message if synchronous) if the message was delivered
- * without error, otherwise an exception is thrown.
- * @throws MessageDeliverException Failed to deliver message, after trying all available EPRs.
- */
- private Message deliver(Message message, boolean synchronous) throws MessageDeliverException
- {
- int numberOfAttemps=0;
- while (numberOfAttemps++ < 2) {
- if ((serviceClusterInfo.getEPRs().size()==0) || (new Date().after(expirationDate)) ) {
- loadServiceClusterInfo();
- }
- Message replyMessage = null;
- EPR epr = null;
- // Iterate over all the EPRs in the list until delivered
- while ((epr = loadBalancer.chooseEPR(serviceClusterInfo))!=null) {
- replyMessage = attemptDelivery(message, epr, synchronous);
- if (replyMessage != null) {
- // We've delivered it, we're done!
- return replyMessage;
- } else {
- logger.info("Dead EPR: " + epr);
- serviceClusterInfo.removeDeadEPR(epr);
- }
- }
- }
-
- // Throw exception if delivery failed...
- throw new MessageDeliverException("Failed to deliver message to Service [" + serviceCategory + ":" + serviceName + "]. Check for errors.");
- }
-
- /**
- * Get the Service category name for the Service for which this instance is delivering messages.
- *
- * @return Service Category.
- */
- public String getServiceCategory() {
- return serviceCategory;
- }
-
- /**
- * Get the Service name for the Service for which this instance is delivering messages.
- *
- * @return Service name.
- */
- public String getServiceName() {
- return serviceName;
- }
-
- /**
- * Attempt to deliver the supplied message using the supplied EPR.
- *
- * @param message The message to be delivered.
- * @param epr The EPR to be used in the delivery attempt.
- * @param synchronous Is the message to be delivered synchronously or not (asynchronously).
- * @return Returns the message (or a reply message if synchronous) if the message was delivered
- * without error, otherwise null.
- */
- private Message attemptDelivery(Message message, EPR epr, boolean synchronous) {
- TwoWayCourier courier = null;
-
- // Get a courier for the EPR...
- try {
- courier = getCourier(epr);
- } catch (CourierException e) {
- logger.debug("Courier lookup failed for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", e);
- } catch (MalformedEPRException e) {
- logger.warn("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
- } catch (Throwable t) {
- logger.warn("Unexpected exception during Courier lookup for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
- }
-
- // Try delivering the message using the courier we just looked up....
- if (courier != null) {
- try {
- EPR replyToEPR = null;
-
- if (synchronous) {
- replyToEPR = getReplyToAddress(epr);
- if(replyToEPR == null) {
- logger.debug("Not using epr [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. No reply-to address available for synchronous response.");
- return null;
- }
- message.getHeader().getCall().setReplyTo(replyToEPR);
- }
- if (courier.deliver(message)) {
- if (replyToEPR != null) {
- courier.setReplyToEpr(replyToEPR);
- return courier.pickup(syncPickupDeliveryTimeout.get());
- } else {
- return message;
- }
- }
- } catch (CourierException e) {
- logger.debug("Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. " + e.getMessage());
- } catch (MalformedEPRException e) {
- // Hmmmm???... Can this really happen? The Courier has already been created. Haven't we already validated the EPR during the Courier lookup (above)??
- logger.warn("Unexpected error. Badly formed EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "]. But the EPR has already been validated!!");
- } catch (Throwable t) {
- logger.warn("Unexpected exception during attempted message delivery over Courier for EPR [" + epr + "] for Service [" + serviceCategory + ":" + serviceName + "].", t);
- } finally {
- // TODO: So does this mean that Couriers are stateful? If so, do we need to synchronize on using them??
- CourierUtil.cleanCourier(courier);
- }
- }
-
- return null;
- }
-
- /**
- * Get the reply to address for synchronous delivery.
- *
- * @param toEpr The to address.
- * @return The replyTo address.
- * @throws ConfigurationException
- */
- protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
- // This method just allows us to override Courier lookup during unit testing.
- try {
- return CourierUtil.getDefaultReplyToEpr(toEpr);
- } catch (CourierException e) {
- throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
- } catch (MalformedEPRException e) {
- throw new ConfigurationException("Bad configuration. Unable to support synchronous reply on 'to' address " + toEpr, e);
- }
- }
-
- /**
- * Get a {@link org.jboss.soa.esb.couriers.Courier} for the supplied EPR.
- *
- * @param epr The EPR for which a {@link org.jboss.soa.esb.couriers.Courier}
- * is being sought.
- * @return The courier for the EPR.
- * @throws CourierException A courier implementation cannot be created.
- * @throws MalformedEPRException Bad EPR.
- */
- protected TwoWayCourier getCourier(EPR epr) throws CourierException, MalformedEPRException {
- // This method just allows us to override Courier lookup during unit testing.
- return CourierFactory.getInstance().getMessageCourier(epr);
- }
- /**
- * Loads the EPRs fresh from the Registry. Right now we will do this every minute
- * until we can expect to get updates from the registry. For now this should work
- * just fine.
- */
- public void loadServiceClusterInfo() throws MessageDeliverException
- {
- try {
- List<EPR> serviceEprs = RegistryUtil.getEprs(serviceCategory, serviceName);
- serviceClusterInfo = new ServiceClusterInfoImpl(serviceName, serviceEprs);
- expirationDate = new Date(java.lang.System.currentTimeMillis() + 60000);
- } catch (RegistryException e) {
- throw new MessageDeliverException(e.getMessage(), e);
- }
- }
-}
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -22,7 +22,8 @@
import org.jboss.internal.soa.esb.assertion.AssertArgument;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
-import org.jboss.soa.esb.listeners.ListenerTagNames;
+import org.jboss.soa.esb.listeners.*;
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.services.registry.RegistryException;
@@ -33,7 +34,7 @@
* instance.
*
* @author <a href="mailto:tom.fennelly at jboss.com">tom.fennelly at jboss.com</a>
- * @see org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter
+ * @see org.jboss.soa.esb.listeners.MessageDeliveryAdapter
*/
public class UncomposedMessageDeliveryAdapter {
@@ -62,7 +63,7 @@
AssertArgument.isNotNull(serviceCategory, "serviceCategory");
AssertArgument.isNotNull(serviceName, "serviceName");
AssertArgument.isNotNull(composer, "composer");
- messageDeliveryAdapter = new MessageDeliveryAdapter(serviceCategory, serviceName);
+ messageDeliveryAdapter = new org.jboss.soa.esb.listeners.MessageDeliveryAdapter(serviceCategory, serviceName);
this.composer = composer;
}
Modified: labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java
===================================================================
--- labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java 2007-07-10 17:19:15 UTC (rev 13305)
+++ labs/jbossesb/trunk/product/samples/quickstarts/webservice_war1/war/src/org/jboss/soa/esb/samples/quickstart/webservicewar1/webservice/HelloWorldWS.java 2007-07-10 17:55:05 UTC (rev 13306)
@@ -27,17 +27,10 @@
import javax.jws.soap.SOAPBinding;
// For ESB Interaction
-import org.jboss.soa.esb.services.registry.Registry; // jbossesb-services.jar
-import org.jboss.soa.esb.services.registry.RegistryFactory; // jbossesb-services.jar
import org.jboss.soa.esb.message.Message; // jbossesb-rosetta.jar
import org.jboss.soa.esb.message.format.MessageFactory; // jbossesb-rosetta.jar
import org.jboss.soa.esb.message.format.MessageType; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.addressing.EPR; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.TwoWayCourier; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.CourierFactory; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.couriers.CourierUtil; // jbossesb-rosetta.jar
-import org.jboss.soa.esb.addressing.Call;
-import org.jboss.soa.esb.listeners.message.MessageDeliveryAdapter; // jbossesb-rosetta.jar
+import org.jboss.soa.esb.listeners.MessageDeliveryAdapter; // jbossesb-rosetta.jar
@WebService(name = "HelloWorld", targetNamespace = "http://webservice_war1/helloworld")
@SOAPBinding(style = SOAPBinding.Style.RPC)
@@ -52,7 +45,7 @@
Message replyMessage = null;
// Create the delivery adapter for the target service (you'd normally cache this!!)...
- deliveryAdapter = new MessageDeliveryAdapter("MyServiceCategory", "MyService");
+ deliveryAdapter = new org.jboss.soa.esb.listeners.MessageDeliveryAdapter("MyServiceCategory", "MyService");
// Create and populate the request message...
requestMessage = MessageFactory.getInstance().getMessage(MessageType.JBOSS_XML);
requestMessage.getBody().setByteArray(toWhom.getBytes()); // inject the value from the WS client
More information about the jboss-svn-commits
mailing list