[jboss-svn-commits] JBL Code SVN: r16071 - in labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb: client and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Oct 25 14:12:37 EDT 2007
Author: kevin.conner at jboss.com
Date: 2007-10-25 14:12:37 -0400 (Thu, 25 Oct 2007)
New Revision: 16071
Added:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
Modified:
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
Log:
Corrected ReplyTo/FaultTo processing: JBESB-1242
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java 2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultFaultTo.java 2007-10-25 18:12:37 UTC (rev 16071)
@@ -1,5 +1,6 @@
package org.jboss.soa.esb.addressing.util;
+import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.message.Message;
@@ -30,35 +31,38 @@
public class DefaultFaultTo
{
- /**
- * Get the FaultTo EPR on this address. If it is not set, then try to return
- * the From EPR. If that is not set, then return null.
- *
- * @param message
- * the message to work on.
- * @return the FaultTo EPR, or ReplyTo, or the From EPR, or null, in that order.
- */
- public static final EPR getFaultToAddress (Message message)
+ /**
+ * Initialise the message as a reply to the specified call details.
+ * @param message The reply message.
+ * @param callDetails The call details used to generate the reply.
+ * @return true if the message was initialised, false otherwise.
+ */
+ public static boolean initialiseReply(final Message message, final Call callDetails)
{
- if (message == null)
- return null;
-
- try
+ return DefaultReplyTo.initialiseReply(message, callDetails, getFaultTo(callDetails)) ;
+ }
+
+ /**
+ * Get the appropriate faultTo address..
+ * @param callDetails The call details used to generate the reply.
+ * @return The faultTo address or null if none set.
+ */
+ static EPR getFaultTo(final Call callDetails)
+ {
+ if (callDetails == null)
{
- if (message.getHeader().getCall().getFaultTo() != null)
- return message.getHeader().getCall().getFaultTo();
- else if (message.getHeader().getCall().getReplyTo() != null)
- return message.getHeader().getCall().getReplyTo();
- else
- return message.getHeader().getCall().getFrom();
+ return null ;
}
- catch (final NullPointerException e)
+
+ final EPR faultTo = callDetails.getFaultTo() ;
+ if (faultTo != null)
{
- // OK, it's not set... return null...
+ return faultTo ;
}
-
- return null;
+ else
+ {
+ return DefaultReplyTo.getReplyTo(callDetails) ;
+ }
}
-
}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java 2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/addressing/util/DefaultReplyTo.java 2007-10-25 18:12:37 UTC (rev 16071)
@@ -9,6 +9,7 @@
import org.jboss.internal.soa.esb.addressing.eprs.DefaultJdbcReplyToEpr;
import org.jboss.internal.soa.esb.addressing.eprs.DefaultJmsReplyToEpr;
import org.jboss.internal.soa.esb.addressing.eprs.DefaultSftpReplyToEpr;
+import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.FTPEpr;
@@ -95,31 +96,62 @@
}
/**
- * Get the ReplyTo EPR on this address. If it is not set, then try to return
- * the From EPR. If that is not set, then return null.
- *
- * @param message
- * the message to work on.
- * @return the ReplyTo EPR, or the From EPR, or null, in that order.
- */
-
- public static final EPR getReplyToAddress (Message message)
+ * Initialise the message as a reply to the specified call details.
+ * @param message The reply message.
+ * @param callDetails The call details used to generate the reply.
+ * @return true if the message was initialised, false otherwise.
+ */
+ public static boolean initialiseReply(final Message message, final Call callDetails)
{
- if (message == null)
- return null;
-
- try
+ return initialiseReply(message, callDetails, getReplyTo(callDetails)) ;
+ }
+
+ /**
+ * Get the appropriate replyTo address..
+ * @param callDetails The call details used to generate the reply.
+ * @return The replyTo address or null if none set.
+ */
+ static EPR getReplyTo(final Call callDetails)
+ {
+ if (callDetails == null)
{
- if (message.getHeader().getCall().getReplyTo() != null)
- return message.getHeader().getCall().getReplyTo();
- else
- return message.getHeader().getCall().getFrom();
+ return null ;
}
- catch (final NullPointerException e)
+
+ final EPR replyTo = callDetails.getReplyTo() ;
+ if (replyTo != null)
{
- // OK, it's not set... return null...
+ return replyTo ;
}
-
- return null;
+ else
+ {
+ return callDetails.getFrom() ;
+ }
}
+
+ /**
+ * Initialise the message as a reply to the specified call details.
+ * @param message The reply message.
+ * @param callDetails The call details used to generate the reply.
+ * @param toEPR The target EPR for the message.
+ * @return true if the message was initialised, false otherwise.
+ */
+ static boolean initialiseReply(final Message message, final Call callDetails,
+ final EPR toEPR)
+ {
+ if ((message == null) || (callDetails == null) || (toEPR == null))
+ {
+ return false ;
+ }
+
+ final Call replyCallDetails = message.getHeader().getCall() ;
+ replyCallDetails.setTo(toEPR) ;
+ replyCallDetails.setFrom(callDetails.getTo()) ;
+ replyCallDetails.setReplyTo(null) ;
+ replyCallDetails.setFaultTo(null) ;
+ replyCallDetails.setRelatesTo(callDetails.getMessageID()) ;
+ replyCallDetails.setMessageID(null) ;
+
+ return true ;
+ }
}
\ No newline at end of file
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2007-10-25 18:12:37 UTC (rev 16071)
@@ -96,7 +96,7 @@
/**
* Dead letter Service
*/
- private static final Service dlqService = new Service(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
+ public static final Service dlqService = new Service(INTERNAL_SERVICE_CATEGORY, DEAD_LETTER_SERVICE_NAME);
/**
* Public constructor.
*
Added: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java (rev 0)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java 2007-10-25 18:12:37 UTC (rev 16071)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+/**
+ * Action Processing Constants.
+ */
+public class ActionProcessingConstants
+{
+ /**
+ * Used to store the call details of the received message when a response cannot be sent.
+ */
+ public static final String PROPERTY_FAILURE_CALL_DETAILS = "org.jboss.soa.esb.failure.call" ;
+ /**
+ * Used to store the response type when the response cannot be sent.
+ */
+ public static final String PROPERTY_FAILURE_RESPONSE_TYPE = "org.jboss.soa.esb.failure.responseType" ;
+}
Property changes on: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingConstants.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Modified: labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2007-10-25 18:07:47 UTC (rev 16070)
+++ labs/jbossesb/trunk/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2007-10-25 18:12:37 UTC (rev 16071)
@@ -43,7 +43,8 @@
import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.message.errors.Factory;
import org.jboss.soa.esb.message.Message;
-import org.jboss.soa.esb.message.format.MessageType;
+import org.jboss.soa.esb.message.Properties;
+import org.jboss.soa.esb.services.persistence.MessageStore;
import org.jboss.soa.esb.util.ClassUtil;
import java.util.ArrayList;
@@ -121,7 +122,7 @@
{
LOGGER.debug("Registering action class " + actionClassTag);
}
- final Class actionClass;
+ final Class<?> actionClass;
try
{
actionClass = ClassUtil.forName(actionClassTag, getClass());
@@ -246,10 +247,10 @@
*/
public boolean process(final Message message)
{
- final EPR faultToAddress = DefaultFaultTo.getFaultToAddress(message);
long start = System.nanoTime();
serviceMessageCounter.incrementTotalCount();
- final EPR fromAddress = message.getHeader().getCall().getTo() ;
+ final Call callDetails = new Call() ;
+ callDetails.copy(message.getHeader().getCall()) ;
if (active.get())
{
@@ -260,7 +261,6 @@
final int numProcessors = processors.length;
final Message[] messages = new Message[numProcessors];
- final EPR replyToAddress = DefaultReplyTo.getReplyToAddress(message);
Message currentMessage = message;
@@ -302,14 +302,14 @@
if (fault.getFaultMessage() == null)
{
- faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.PROCESSING_ERROR, message, ex));
+ faultTo(callDetails, Factory.createErrorMessage(Factory.PROCESSING_ERROR, message, ex));
}
else
- faultTo(fromAddress, faultToAddress, fault.getFaultMessage());
+ faultTo(callDetails, fault.getFaultMessage());
}
else if (!throwRuntime)
{
- faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.UNEXPECTED_ERROR, message, ex));
+ faultTo(callDetails, Factory.createErrorMessage(Factory.UNEXPECTED_ERROR, message, ex));
}
long procTime = System.nanoTime() - start;
@@ -329,9 +329,9 @@
}
// Reply...
- if ((currentMessage != null) && (replyToAddress != null))
+ if (currentMessage != null)
{
- replyTo(fromAddress, replyToAddress, currentMessage);
+ replyTo(callDetails, currentMessage);
}
notifySuccess(messages);
@@ -344,7 +344,7 @@
{
LOGGER.debug("pipeline process disabled for message: "+message.getHeader());
- faultTo(fromAddress, faultToAddress, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
+ faultTo(callDetails, Factory.createErrorMessage(Factory.NOT_ENABLED, message, null));
long procTime = System.nanoTime() - start;
DeliveryObservableLogger.getInstance().logMessage(new MessageStatusBean(procTime, message,
MessageStatusBean.MESSAGE_FAILED));
@@ -371,72 +371,79 @@
return transactional ;
}
- /**
+ /**
* Send the reply.
*
- * @param fromAddress
- * the EPR to which the original message was sent.
- * @param replyToAddress
- * the EPR to target if one is not defined in the message.
+ * @param callDetails
+ * the call details for the original request.
* @param message
* the message.
*/
- private void replyTo(EPR fromAddress, EPR replyToAddress, Message message)
+ private void replyTo(final Call callDetails, final Message message)
{
- EPR replyToEPR = DefaultReplyTo.getReplyToAddress(message);
-
- if (replyToEPR != null) {
- replyToAddress = replyToEPR;
- }
-
- if (replyToAddress == null)
+ if (!DefaultReplyTo.initialiseReply(message, callDetails))
{
- LOGGER.warn("No reply to address defined for reply message! "+message.getHeader());
- }
+ LOGGER.warn("No reply to address defined for reply message! " + callDetails);
+ sendToDLQ(callDetails, message, MessageType.reply) ;
+ }
else
{
- final Call call = message.getHeader().getCall() ;
- call.setFrom(fromAddress) ;
- call.setReplyTo(null) ;
- call.setFaultTo(null) ;
-
- messageTo(replyToAddress, message, MessageType.reply);
+ final EPR replyToEPR = message.getHeader().getCall().getTo() ;
+ messageTo(replyToEPR, message, MessageType.reply);
}
}
- /**
+ /**
* Send the fault message to the EPR.
*
- * @param fromAddress
- * the EPR to which the original message was sent.
+ * @param callDetails
+ * the call details for the original request.
* @param faultToAddress
* the EPR to target if one is not set in the message.
* @param message
* the message.
*/
- private void faultTo(EPR fromAddress, EPR faultToAddress, Message message)
+ private void faultTo(final Call callDetails, final Message message)
{
- EPR faultToEPR = DefaultFaultTo.getFaultToAddress(message);
-
- if (faultToEPR != null) {
- faultToAddress = faultToEPR;
- }
-
- if (faultToAddress == null)
+ if (!DefaultFaultTo.initialiseReply(message, callDetails))
{
- LOGGER.warn("No fault address defined for fault message! "+message.getHeader());
- }
+ LOGGER.warn("No fault address defined for fault message! " + callDetails);
+ sendToDLQ(callDetails, message, MessageType.fault) ;
+ }
else
{
- final Call call = message.getHeader().getCall() ;
- call.setFrom(fromAddress) ;
- call.setReplyTo(null) ;
- call.setFaultTo(null) ;
+ final EPR faultToEPR = message.getHeader().getCall().getTo() ;
+ messageTo(faultToEPR, message, MessageType.fault);
+ }
+ }
+
+ /**
+ * Sent the message to the DLQ service.
+ * @param callDetails The original call details.
+ * @param message The response message.
+ * @param messageType The response type.
+ */
+ private void sendToDLQ(final Call callDetails, final Message message,
+ final MessageType messageType)
+ {
+ final Properties properties = message.getProperties() ;
+ properties.setProperty(MessageStore.CLASSIFICATION, MessageStore.CLASSIFICATION_DLQ);
+ properties.setProperty(ActionProcessingConstants.PROPERTY_FAILURE_CALL_DETAILS, callDetails.toString()) ;
+ properties.setProperty(ActionProcessingConstants.PROPERTY_FAILURE_RESPONSE_TYPE, messageType.name()) ;
- messageTo(faultToAddress, message, MessageType.fault);
- }
+ try
+ {
+ final ServiceInvoker serviceInvoker = new ServiceInvoker(ServiceInvoker.dlqService) ;
+
+ serviceInvoker.deliverAsync(message) ;
+ }
+ catch (final MessageDeliverException mde)
+ {
+ LOGGER.warn("Failed to send response failure to DLQ service") ;
+ LOGGER.debug("Failed to send response failure to DLQ service", mde) ;
+ }
}
private static enum MessageType {
@@ -476,7 +483,7 @@
}
}
- /**
+ /**
* Handle the destruction of the pipeline from the specified position.
*
* @param initialPosition
More information about the jboss-svn-commits
mailing list