[jboss-svn-commits] JBL Code SVN: r16071 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: client and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Thu Oct 25 14:12:37 EDT 2007


Author: kevin.conner at jboss.com
Date: 2007-10-25 14:12:37 -0400 (Thu, 25 Oct 2007)
New Revision: 16071

Added:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
Modified:
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
   labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
Corrected ReplyTo/FaultTo processing: JBESB-1242

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java	2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java	2007-10-25 18:12:37 UTC (rev 16071)
@@ -1,5 +1,6 @@
 package org.jboss.soa.esb.addressing.util;
 
+import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.message.Message;
 
@@ -30,35 +31,38 @@
 
 public class DefaultFaultTo
 {
-	/**
-	 * Get the FaultTo EPR on this address. If it is not set, then try to return
-	 * the From EPR. If that is not set, then return null.
-	 * 
-	 * @param message
-	 *            the message to work on.
-	 * @return the FaultTo EPR, or ReplyTo, or the From EPR, or null, in that order.
-	 */
 
-	public static final EPR getFaultToAddress (Message message)
+    /**
+     * Initialise the message as a reply to the specified call details.
+     * @param message The reply message.
+     * @param callDetails The call details used to generate the reply.
+     * @return true if the message was initialised, false otherwise.
+     */
+	public static boolean initialiseReply(final Message message, final Call callDetails)
 	{
-		if (message == null)
-			return null;
-
-		try
+		return DefaultReplyTo.initialiseReply(message, callDetails, getFaultTo(callDetails)) ;
+	}
+	
+    /**
+     * Get the appropriate faultTo address..
+     * @param callDetails The call details used to generate the reply.
+     * @return The faultTo address or null if none set.
+     */
+	static EPR getFaultTo(final Call callDetails)
+	{
+		if (callDetails == null)
 		{
-			if (message.getHeader().getCall().getFaultTo() != null)
-				return message.getHeader().getCall().getFaultTo();
-			else if (message.getHeader().getCall().getReplyTo() != null)
-				return message.getHeader().getCall().getReplyTo();
-			else
-				return message.getHeader().getCall().getFrom();
+			return null ;
 		}
-		catch (final NullPointerException e)
+		
+		final EPR faultTo = callDetails.getFaultTo() ;
+		if (faultTo != null)
 		{
-			// OK, it's not set... return null...
+			return faultTo ;
 		}
-
-		return null;
+		else
+		{
+			return DefaultReplyTo.getReplyTo(callDetails) ;
+		}
 	}
-
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java	2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java	2007-10-25 18:12:37 UTC (rev 16071)
@@ -9,6 +9,7 @@
 import org.jboss.internal.soa.esb.addressing.eprs.DefaultJdbcReplyToEpr;
 import org.jboss.internal.soa.esb.addressing.eprs.DefaultJmsReplyToEpr;
 import org.jboss.internal.soa.esb.addressing.eprs.DefaultSftpReplyToEpr;
+import org.jboss.soa.esb.addressing.Call;
 import org.jboss.soa.esb.addressing.EPR;
 import org.jboss.soa.esb.addressing.MalformedEPRException;
 import org.jboss.soa.esb.addressing.eprs.FTPEpr;
@@ -95,31 +96,62 @@
     }
 
     /**
-	 * Get the ReplyTo EPR on this address. If it is not set, then try to return
-	 * the From EPR. If that is not set, then return null.
-	 * 
-	 * @param message
-	 *            the message to work on.
-	 * @return the ReplyTo EPR, or the From EPR, or null, in that order.
-	 */
-    
-    public static final EPR getReplyToAddress (Message message)
+     * Initialise the message as a reply to the specified call details.
+     * @param message The reply message.
+     * @param callDetails The call details used to generate the reply.
+     * @return true if the message was initialised, false otherwise.
+     */
+	public static boolean initialiseReply(final Message message, final Call callDetails)
 	{
-		if (message == null)
-			return null;
-
-		try
+		return initialiseReply(message, callDetails, getReplyTo(callDetails)) ;
+	}
+	
+    /**
+     * Get the appropriate replyTo address..
+     * @param callDetails The call details used to generate the reply.
+     * @return The replyTo address or null if none set.
+     */
+	static EPR getReplyTo(final Call callDetails)
+	{
+		if (callDetails == null)
 		{
-			if (message.getHeader().getCall().getReplyTo() != null)
-				return message.getHeader().getCall().getReplyTo();
-			else
-				return message.getHeader().getCall().getFrom();
+			return null ;
 		}
-		catch (final NullPointerException e)
+		
+		final EPR replyTo = callDetails.getReplyTo() ;
+		if (replyTo != null)
 		{
-			// OK, it's not set... return null...
+			return replyTo ;
 		}
-
-		return null;
+		else
+		{
+			return callDetails.getFrom() ;
+		}
 	}
+	
+    /**
+     * Initialise the message as a reply to the specified call details.
+     * @param message The reply message.
+     * @param callDetails The call details used to generate the reply.
+     * @param toEPR The target EPR for the message.
+     * @return true if the message was initialised, false otherwise.
+     */
+	static boolean initialiseReply(final Message message, final Call callDetails,
+		final EPR toEPR)
+	{
+		if ((message == null) || (callDetails == null) || (toEPR == null))
+		{
+			return false ;
+		}
+		
+		final Call replyCallDetails = message.getHeader().getCall() ;
+		replyCallDetails.setTo(toEPR) ;
+		replyCallDetails.setFrom(callDetails.getTo()) ;
+		replyCallDetails.setReplyTo(null) ;
+		replyCallDetails.setFaultTo(null) ;
+		replyCallDetails.setRelatesTo(callDetails.getMessageID()) ;
+		replyCallDetails.setMessageID(null) ;
+		
+		return true ;
+	}
 }
\ No newline at end of file

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java	2007-10-25 18:12:37 UTC (rev 16071)
@@ -96,7 +96,7 @@
     /**
      * Dead letter Service
      */
-    private static final Service dlqService = new Service(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+    public static final Service dlqService = new Service(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
     /**
      * Public constructor.
      *

Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java	                        (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java	2007-10-25 18:12:37 UTC (rev 16071)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+/**
+ * Action Processing Constants.
+ */
+public class ActionProcessingConstants
+{
+	/**
+	 * Used to store the call details of the received message when a response cannot be sent.
+	 */
+	public static final String PROPERTY_FAILURE_CALL_DETAILS = "org.jboss.soa.esb.failure.call" ;
+	/**
+	 * Used to store the response type when the response cannot be sent. 
+	 */
+	public static final String PROPERTY_FAILURE_RESPONSE_TYPE = "org.jboss.soa.esb.failure.responseType" ;
+}


Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java	2007-10-25 18:12:37 UTC (rev 16071)
@@ -43,7 +43,8 @@
 import org.jboss.soa.esb.listeners.ListenerTagNames;
 import org.jboss.soa.esb.listeners.message.errors.Factory;
 import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageType;
+import org.jboss.soa.esb.message.Properties;
+import org.jboss.soa.esb.services.persistence.MessageStore;
 import org.jboss.soa.esb.util.ClassUtil;
 
 import java.util.ArrayList;
@@ -121,7 +122,7 @@
 			{
 				LOGGER.debug("Registering action class " + actionClassTag);
 			}
-			final Class actionClass;
+			final Class<?> actionClass;
 			try
 			{
 				actionClass = ClassUtil.forName(actionClassTag, getClass());
@@ -246,10 +247,10 @@
 	 */
 	public boolean process(final Message message)
 	{
-		final EPR faultToAddress = DefaultFaultTo.getFaultToAddress(message);
 		long start = System.nanoTime();
 		serviceMessageCounter.incrementTotalCount();
-		final EPR fromAddress = message.getHeader().getCall().getTo() ;
+		final Call callDetails = new Call() ;
+		callDetails.copy(message.getHeader().getCall()) ;
 
 		if (active.get())
 		{
@@ -260,7 +261,6 @@
 
 			final int numProcessors = processors.length;
 			final Message[] messages = new Message[numProcessors];
-			final EPR replyToAddress = DefaultReplyTo.getReplyToAddress(message);
 
 			Message currentMessage = message;
 
@@ -302,14 +302,14 @@
 
 						if (fault.getFaultMessage() == null)
 						{
-							faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.PROCESSING_ERROR, message, ex));
+							faultTo(callDetails, Factory.createErrorMessage(Factory.PROCESSING_ERROR, message, ex));
 						}
 						else
-							faultTo(fromAddress, faultToAddress, fault.getFaultMessage());
+							faultTo(callDetails, fault.getFaultMessage());
 					}
 					else if (!throwRuntime)
 					{
-						faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.UNEXPECTED_ERROR, message, ex));
+						faultTo(callDetails, Factory.createErrorMessage(Factory.UNEXPECTED_ERROR, message, ex));
 					}
 
 					long procTime = System.nanoTime() - start;
@@ -329,9 +329,9 @@
 			}
 
 			// Reply...
-			if ((currentMessage != null) && (replyToAddress != null))
+			if (currentMessage != null)
 			{
-				replyTo(fromAddress, replyToAddress, currentMessage);
+				replyTo(callDetails, currentMessage);
 			}
 
 			notifySuccess(messages);
@@ -344,7 +344,7 @@
 		{
 			LOGGER.debug("pipeline process disabled for message: "+message.getHeader());
 
-			faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
+			faultTo(callDetails, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
 			long procTime = System.nanoTime() - start;
         	DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message, 
         			MessageStatusBean.MESSAGE_FAILED));
@@ -371,72 +371,79 @@
 	    return transactional ;
 	}
 
-    /**
+	/**
 	 * Send the reply.
 	 * 
-         * @param fromAddress
-         *            the EPR to which the original message was sent.
-	 * @param replyToAddress
-	 *            the EPR to target if one is not defined in the message.
+	 * @param callDetails
+	 *            the call details for the original request.
 	 * @param message
 	 *            the message.
 	 */
 
-	private void replyTo(EPR fromAddress, EPR replyToAddress, Message message)
+	private void replyTo(final Call callDetails, final Message message)
 	{
-		EPR replyToEPR = DefaultReplyTo.getReplyToAddress(message);
-
-        if (replyToEPR != null) {
-            replyToAddress = replyToEPR;
-        }
-
-        if (replyToAddress == null)
+		if (!DefaultReplyTo.initialiseReply(message, callDetails))
 		{
-            LOGGER.warn("No reply to address defined for reply message! "+message.getHeader());
-        }
+			LOGGER.warn("No reply to address defined for reply message! " + callDetails);
+			sendToDLQ(callDetails, message, MessageType.reply) ;
+		}
 		else
 		{
-            final Call call = message.getHeader().getCall() ;
-            call.setFrom(fromAddress) ;
-            call.setReplyTo(null) ;
-            call.setFaultTo(null) ;
-
-            messageTo(replyToAddress, message, MessageType.reply);
+			final EPR replyToEPR = message.getHeader().getCall().getTo() ;
+			messageTo(replyToEPR, message, MessageType.reply);
 		}
 	}
 
-    /**
+	/**
 	 * Send the fault message to the EPR.
 	 * 
-         * @param fromAddress
-         *            the EPR to which the original message was sent.
+	 * @param callDetails
+	 *            the call details for the original request.
 	 * @param faultToAddress
 	 *            the EPR to target if one is not set in the message.
 	 * @param message
 	 *            the message.
 	 */
 
-	private void faultTo(EPR fromAddress, EPR faultToAddress, Message message)
+	private void faultTo(final Call callDetails, final Message message)
 	{
-		EPR faultToEPR = DefaultFaultTo.getFaultToAddress(message);
-
-        if (faultToEPR != null) {
-            faultToAddress = faultToEPR;
-        }
-
-        if (faultToAddress == null)
+		if (!DefaultFaultTo.initialiseReply(message, callDetails))
 		{
-            LOGGER.warn("No fault address defined for fault message! "+message.getHeader());
-        }
+			LOGGER.warn("No fault address defined for fault message! " + callDetails);
+			sendToDLQ(callDetails, message, MessageType.fault) ;
+		}
 		else
 		{
-            final Call call = message.getHeader().getCall() ;
-            call.setFrom(fromAddress) ;
-            call.setReplyTo(null) ;
-            call.setFaultTo(null) ;
+			final EPR faultToEPR = message.getHeader().getCall().getTo() ;
+			messageTo(faultToEPR, message, MessageType.fault);
+		}
+	}
+	
+	/**
+	 * Sent the message to the DLQ service.
+	 * @param callDetails The original call details.
+	 * @param message The response message.
+	 * @param messageType The response type.
+	 */
+	private void sendToDLQ(final Call callDetails, final Message message,
+		final MessageType messageType)
+	{
+		final Properties properties = message.getProperties() ;
+		properties.setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_DLQ);
+		properties.setProperty(ActionProcessingConstants.PROPERTY_FAILURE_CALL_DETAILS, callDetails.toString()) ;
+		properties.setProperty(ActionProcessingConstants.PROPERTY_FAILURE_RESPONSE_TYPE, messageType.name()) ;
 
-            messageTo(faultToAddress, message, MessageType.fault);
-        }
+		try
+		{
+			final ServiceInvoker serviceInvoker = new ServiceInvoker(ServiceInvoker.dlqService) ;
+			
+			serviceInvoker.deliverAsync(message) ;
+		}
+		catch (final MessageDeliverException mde)
+		{
+			LOGGER.warn("Failed to send response failure to DLQ service") ;
+			LOGGER.debug("Failed to send response failure to DLQ service", mde) ;
+		}
 	}
 
     private static enum MessageType {
@@ -476,7 +483,7 @@
         }
     }
 
-    /**
+	/**
 	 * Handle the destruction of the pipeline from the specified position.
 	 * 
 	 * @param initialPosition




More information about the jboss-svn-commits mailing list