[jboss-svn-commits] JBL Code SVN: r38100 - in labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta: src/org/jboss/internal/soa/esb/couriers and 24 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed May 23 20:48:08 EDT 2012
Author: kevin.conner at jboss.com
Date: 2012-05-23 20:48:06 -0400 (Wed, 23 May 2012)
New Revision: 38100
Added:
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/util/MessageFlowContext.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MockPriorityProcessor.java
Modified:
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/deployer-as6/src/org/jboss/soa/esb/listeners/deployers/mc/as6/WebGatewayBuilder.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/listeners/war/HttpGatewayDeploymentFactory.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/BaseWebService.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/ESBServiceEndpointInfo.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/JAXWSProviderClassGenerator.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/OneWayBaseWebService.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/RequestResponseBaseWebService.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebGatewayBuilder.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebserviceInfo.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/XMLBeansModel.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers110/XMLBeansModel.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/XMLBeansModel.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/XMLBeansModel.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers131/XMLBeansModel.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model101SchemaParser.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model110SchemaParser.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model120SchemaParser.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model130SchemaParser.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/ModelAdapter.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/HibernateInterceptor.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/CamelGateway.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/JBossESBComponent.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/ServiceProcessor.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/http/HttpGatewayServlet.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/mina/UdpGatewayListener.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/BaseJcaInflow.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractScheduledManagedLifecycle.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/webservice/BaseWebServiceUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GroovyGatewayUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipelineUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java
labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ServiceInvokerCallUnitTest.java
Log:
Add support for message flow priority: JBESB-3786
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/deployer-as6/src/org/jboss/soa/esb/listeners/deployers/mc/as6/WebGatewayBuilder.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/deployer-as6/src/org/jboss/soa/esb/listeners/deployers/mc/as6/WebGatewayBuilder.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/deployer-as6/src/org/jboss/soa/esb/listeners/deployers/mc/as6/WebGatewayBuilder.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -134,12 +134,12 @@
private void createWebserviceWars(WebDeploymentArchive webDeployment) throws DeploymentException
{
this.publishers = new ArrayList<ContractReferencePublisher>();
- final List<WebserviceInfo> endpointServices = model.getWebserviceServices();
- if (endpointServices != null)
+ try
{
- if (endpointServices.size() > 0)
+ final List<WebserviceInfo> endpointServices = model.getWebserviceServices();
+ if (endpointServices != null)
{
- try
+ if (endpointServices.size() > 0)
{
final JAXWSProviderClassGenerator generator = new JAXWSProviderClassGenerator();
@@ -188,12 +188,12 @@
publishers.add(publisher);
}
}
- catch (final Exception ex)
- {
- throw new DeploymentException("Failed to create webservice artifact", ex);
- }
}
}
+ catch (final Exception ex)
+ {
+ throw new DeploymentException("Failed to create webservice artifact", ex);
+ }
}
private File createGatewayTempDeployment(DeploymentArchive deploymentArchive) throws ConfigurationException
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/couriers/JmsCourier.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -46,6 +46,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -294,6 +295,12 @@
message.setStringProperty(key, kvp.getValue());
}
}
+ final Integer priority = MessageFlowContext.getMessageFlowPriority();
+ _messageProducer.setPriority(priority == null ? javax.jms.Message.DEFAULT_PRIORITY : priority.intValue()) ;
+ if ((priority != null) && _logger.isDebugEnabled())
+ {
+ _logger.debug("Producer initialised with priority " + priority) ;
+ }
} catch (final JMSException e) {
throw new CourierTransportException("Caught exception initialising properties! ",e);
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/listeners/war/HttpGatewayDeploymentFactory.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/listeners/war/HttpGatewayDeploymentFactory.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/listeners/war/HttpGatewayDeploymentFactory.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -65,12 +65,13 @@
// Set the endpoint urlPattern so as to map requests from that namespace into
// this servlet instance...
servlet.getUrlMappings().add(urlMapping);
-
+
String allowedPorts = webEndpointConfig.getAttribute(Servlet.ALLOWED_PORTS);
if(allowedPorts != null) {
servlet.getParams().add( new KeyValuePair(Servlet.ALLOWED_PORTS, allowedPorts) );
}
+ final String messageFlowPriority = getMessageFlowPriority(webEndpointConfig) ;
// Add the endpoint address of the servlet...
addEndpointAddress(webEndpointConfig, servlet, urlMapping);
@@ -80,8 +81,21 @@
// Map all config tree attributes as servlet parameters...
servlet.getParams().addAll(webEndpointConfig.attributesAsList());
servlet.getParams().add(new KeyValuePair(ListenerTagNames.DEPLOYMENT_NAME_TAG, deploymentName)) ;
+ if (messageFlowPriority != null) {
+ servlet.getParams().add(new KeyValuePair(ListenerTagNames.MESSAGE_FLOW_PRIORITY, messageFlowPriority)) ;
+ }
}
+ private String getMessageFlowPriority(final ConfigTree config) {
+ final ConfigTree[] properties = config.getChildren("property") ;
+ for(ConfigTree property: properties) {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getAttribute("name"))) {
+ return property.getAttribute("value") ;
+ }
+ }
+ return null ;
+ }
+
private void addEndpointAddress(ConfigTree webEndpointConfig, Servlet servlet, String urlMapping) {
String transportGuarantee = webEndpointConfig.getAttribute(TRANSPORT_GUARANTEE);
boolean secure = ( "CONFIDENTIAL".equals(transportGuarantee) || "INTEGRAL".equals(transportGuarantee) );
Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/util/MessageFlowContext.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/util/MessageFlowContext.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/util/MessageFlowContext.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2008, Red Hat Middleware
+ * LLC, and individual contributors by the @authors tag. See the copyright.txt
+ * in the distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.internal.soa.esb.util;
+
+import org.jboss.soa.esb.ConfigurationException;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
+
+
+/**
+ * Context containing message flow priority.
+ *
+ * @author <a href="mailto:Kevin.Conner at jboss.com">Kevin Conner</a>
+ *
+ */
+public final class MessageFlowContext
+{
+ private static ThreadLocal<Integer> messageFlowPriorityTL = new ThreadLocal<Integer>() ;
+
+ private static final int MIN_PRIORITY = 0 ;
+ private static final int MAX_PRIORITY = 9 ;
+
+ /**
+ * Set the message flow priority
+ * @param messageFlowPriority The message flow priority
+ */
+ public static void setMessageFlowPriority(final Integer messageFlowPriority)
+ {
+ messageFlowPriorityTL.set(messageFlowPriority) ;
+ }
+
+ /**
+ * Get the message flow priority
+ * @return The message flow priority or null if default
+ */
+ public static Integer getMessageFlowPriority()
+ {
+ return messageFlowPriorityTL.get();
+ }
+
+ /**
+ * Retrieve message flow priority from configuration
+ * @param configTree The current configuration
+ * @return The message flow priority or null if not configured
+ */
+ public static Integer parseMessageFlowPriority(final ConfigTree config)
+ throws ConfigurationException
+ {
+ final String priorityValue = config.getAttribute(ListenerTagNames.MESSAGE_FLOW_PRIORITY) ;
+ if (priorityValue != null)
+ {
+ return parseMessageFlowPriority(priorityValue) ;
+ }
+ else
+ {
+ return null ;
+ }
+ }
+
+ /**
+ * Retrieve message flow priority from configuration
+ * @param priorityValue The priority value.
+ * @return The message flow priority or null if not configured
+ */
+ public static Integer parseMessageFlowPriority(final String priorityValue)
+ throws ConfigurationException
+ {
+ final int priority ;
+ try
+ {
+ priority = Integer.parseInt(priorityValue) ;
+ }
+ catch (final NumberFormatException nfe)
+ {
+ throw new ConfigurationException("Could not parse message flow priority " + priorityValue) ;
+ }
+ if ((priority < MIN_PRIORITY) || (priority > MAX_PRIORITY))
+ {
+ throw new ConfigurationException("Invalid gateway priority specified: " + priority) ;
+ }
+ else
+ {
+ return Integer.valueOf(priority) ;
+ }
+ }
+}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/BaseWebService.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/BaseWebService.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/BaseWebService.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -50,6 +50,7 @@
import javax.xml.ws.addressing.Relationship;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.internal.soa.esb.util.XMLHelper;
import org.jboss.internal.soa.esb.webservice.addressing.AddressingConstants;
import org.jboss.internal.soa.esb.webservice.addressing.MAP;
@@ -109,14 +110,17 @@
protected final MessagePayloadProxy requestProxy ;
protected final MessagePayloadProxy responseProxy ;
protected final String action ;
+ protected final Integer messageFlowPriority ;
- protected BaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action)
+ protected BaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action,
+ final Integer messageFlowPriority)
throws MessageDeliverException
{
this.deployment = deployment ;
this.serviceInvoker = serviceInvoker ;
requestProxy = new MessagePayloadProxy(null, requestLocation) ;
responseProxy = new MessagePayloadProxy(responseLocation, null) ;
+ this.messageFlowPriority = messageFlowPriority ;
this.action = action ;
}
@@ -178,7 +182,16 @@
ExtractorUtil.addAuthRequestToMessage(authRequest, esbReq);
// We should be able to return null here but this causes JBossWS to NPE.
- final Message esbRes = deliverMessage(esbReq) ;
+ final Message esbRes ;
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try
+ {
+ esbRes = deliverMessage(esbReq) ;
+ }
+ finally
+ {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
final SOAPMessage response = SOAP_MESSAGE_FACTORY.createMessage();
if (esbRes != null)
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/ESBServiceEndpointInfo.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/ESBServiceEndpointInfo.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/ESBServiceEndpointInfo.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -52,6 +52,7 @@
private final String requestLocation ;
private final String responseLocation ;
private final boolean addressing ;
+ private final Integer messageFlowPriority ;
public ESBServiceEndpointInfo(final WebserviceInfo webserviceInfo)
throws UnsupportedEncodingException {
@@ -90,6 +91,7 @@
requestLocation = webserviceInfo.getRequestLocation() ;
responseLocation = webserviceInfo.getResponseLocation() ;
addressing = webserviceInfo.isAddressing() ;
+ messageFlowPriority = webserviceInfo.getMessageFlowPriority() ;
}
public String getRequestName() {
@@ -167,6 +169,10 @@
public boolean isAddressing() {
return addressing ;
}
+
+ public Integer getMessageFlowPriority() {
+ return messageFlowPriority ;
+ }
static
{
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/JAXWSProviderClassGenerator.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/JAXWSProviderClassGenerator.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/JAXWSProviderClassGenerator.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -151,7 +151,7 @@
final String constructorStr = "super(" + getParamValue(deployment) + "," + siFieldName + "," +
getParamValue(epInfo.getRequestLocation()) + "," + getParamValue(epInfo.getResponseLocation()) + ", \"" +
- epInfo.getResponseAction() + "\");" ;
+ epInfo.getResponseAction() + "\", " + getIntegerValue(epInfo.getMessageFlowPriority()) + ");" ;
CtConstructor defaultConstructor = new CtConstructor(null, seiClass) ;
defaultConstructor.setBody(constructorStr) ;
seiClass.addConstructor(defaultConstructor) ;
@@ -169,7 +169,15 @@
if (value == null) {
return "null" ;
} else {
- return '"' + value + '"' ;
+ return '"' + value.toString() + '"' ;
}
}
+
+ private String getIntegerValue(final Integer value) {
+ if (value == null) {
+ return "null" ;
+ } else {
+ return "Integer.valueOf(" + value + ")" ;
+ }
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/OneWayBaseWebService.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/OneWayBaseWebService.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/OneWayBaseWebService.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -31,10 +31,11 @@
*/
public class OneWayBaseWebService extends BaseWebService
{
- protected OneWayBaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action)
+ protected OneWayBaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action,
+ final Integer messageFlowPriority)
throws MessageDeliverException
{
- super(deployment, serviceInvoker, requestLocation, responseLocation, action) ;
+ super(deployment, serviceInvoker, requestLocation, responseLocation, action, messageFlowPriority) ;
}
@Override
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/RequestResponseBaseWebService.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/RequestResponseBaseWebService.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/internal/soa/esb/webservice/RequestResponseBaseWebService.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -42,10 +42,11 @@
private static final long TIMEOUT ;
private static final Logger LOGGER = Logger.getLogger(RequestResponseBaseWebService.class) ;
- protected RequestResponseBaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action)
+ protected RequestResponseBaseWebService(final String deployment, final ServiceInvoker serviceInvoker, final String requestLocation, final String responseLocation, final String action,
+ final Integer messageFlowPriority)
throws MessageDeliverException
{
- super(deployment, serviceInvoker, requestLocation, responseLocation, action) ;
+ super(deployment, serviceInvoker, requestLocation, responseLocation, action, messageFlowPriority) ;
}
@Override
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/client/ServiceInvoker.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -31,6 +31,7 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.addressing.helpers.EPRHelper;
import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.addressing.Call;
@@ -344,6 +345,20 @@
{
message.getContext().setContext(SecurityService.AUTH_REQUEST, encryptedAuthRequest);
}
+ // Attach the message flow priority
+ final Integer flowPriority = MessageFlowContext.getMessageFlowPriority() ;
+ if (flowPriority == null)
+ {
+ message.getContext().removeContext(Environment.MESSAGE_FLOW_PRIORITY) ;
+ }
+ else
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Propagating message flow priority: " + flowPriority) ;
+ }
+ message.getContext().setContext(Environment.MESSAGE_FLOW_PRIORITY, flowPriority) ;
+ }
try {
//We are removing dead EPRs from the serviceClusterInfo. *Previous* deliveries maybe have
@@ -383,6 +398,7 @@
// remove the security context so that it is not exposed to the action pipeline.
replyMessage.getContext().removeContext(SecurityService.CONTEXT);
replyMessage.getContext().removeContext(SecurityService.AUTH_REQUEST);
+ replyMessage.getContext().removeContext(Environment.MESSAGE_FLOW_PRIORITY) ;
if (Type.isFaultMessage(replyMessage)) {
Factory.createExceptionFromFault(replyMessage) ;
@@ -438,6 +454,7 @@
} finally {
message.getContext().removeContext(SecurityService.CONTEXT);
message.getContext().removeContext(SecurityService.AUTH_REQUEST);
+ message.getContext().removeContext(Environment.MESSAGE_FLOW_PRIORITY);
}
// Throw exception if delivery failed...
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/common/Environment.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/common/Environment.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -224,6 +224,7 @@
public static final String MESSAGE_EXIT_TIME = "org.jboss.soa.esb.message.time.dod"; // time died
public static final String MESSAGE_BYTE_SIZE = "org.jboss.soa.esb.message.byte.size"; // size
public static final String MESSAGE_TIME_PROCESSED = "org.jboss.soa.esb.message.time.processed"; //
+ public static final String MESSAGE_FLOW_PRIORITY = "org.jboss.soa.esb.message.flowPriority";
public static final String MESSAGE_SERVICE_ROUTE = "org.jboss.soa.esb.message.service.route";
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/ListenerTagNames.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -62,6 +62,7 @@
public static final String GATEWAY_CLASS_TAG = "gatewayClass";
public static final String IS_GATEWAY_TAG = "is-gateway";
public static final String SERVICE_INVOKER_TIMEOUT = "serviceInvokerTimeout";
+ public static final String MESSAGE_FLOW_PRIORITY = "messageFlowPriority";
/** Listeners */
public static final String LISTENER_CLASS_TAG = "listenerClass";
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebGatewayBuilder.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebGatewayBuilder.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebGatewayBuilder.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -131,12 +131,12 @@
private void createWebserviceWars(WebDeploymentArchive webDeployment) throws DeploymentException
{
this.publishers = new ArrayList<ContractReferencePublisher>();
- final List<WebserviceInfo> endpointServices = model.getWebserviceServices();
- if (endpointServices != null)
+ try
{
- if (endpointServices.size() > 0)
+ final List<WebserviceInfo> endpointServices = model.getWebserviceServices();
+ if (endpointServices != null)
{
- try
+ if (endpointServices.size() > 0)
{
final JAXWSProviderClassGenerator generator = new JAXWSProviderClassGenerator();
@@ -185,12 +185,12 @@
publishers.add(publisher);
}
}
- catch (final Exception ex)
- {
- throw new DeploymentException("Failed to create webservice artifact", ex);
- }
}
}
+ catch (final Exception ex)
+ {
+ throw new DeploymentException("Failed to create webservice artifact", ex);
+ }
}
private File createGatewayTempDeployment(DeploymentArchive deploymentArchive) throws ConfigurationException
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebserviceInfo.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebserviceInfo.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/WebserviceInfo.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -64,7 +64,12 @@
* Flag indicating the endpoint requires addressing.
*/
private final boolean addressing ;
+ /**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
+
/**
* Construct the webservice information.
* @param service The associated service details.
@@ -73,11 +78,13 @@
* @param faultXsd The fault schemas.
* @param description The service description.
* @param requestResponse The service request/response flag.
+ * @param messageFlowPriority The message flow priority.
*/
public WebserviceInfo(final Service service, final String inXsd, final String outXsd,
- final String faultXsd, final String description, final boolean requestResponse)
+ final String faultXsd, final String description, final boolean requestResponse,
+ final Integer messageFlowPriority)
{
- this(service, inXsd, outXsd, faultXsd, description, requestResponse, null, null, false) ;
+ this(service, inXsd, outXsd, faultXsd, description, requestResponse, null, null, false, messageFlowPriority) ;
}
/**
@@ -91,10 +98,12 @@
* @param requestLocation The request location within the message.
* @param responseLocation The response location within the message.
* @param addressing Does the endpoint require addressing?
+ * @param messageFlowPriority The message flow priority.
*/
public WebserviceInfo(final Service service, final String inXsd, final String outXsd,
final String faultXsd, final String description, final boolean requestResponse,
- final String requestLocation, final String responseLocation, final boolean addressing)
+ final String requestLocation, final String responseLocation, final boolean addressing,
+ final Integer messageFlowPriority)
{
this.service = service ;
this.inXsd = inXsd ;
@@ -105,6 +114,7 @@
this.requestLocation = requestLocation ;
this.responseLocation = responseLocation ;
this.addressing = addressing ;
+ this.messageFlowPriority = messageFlowPriority ;
}
/**
@@ -189,6 +199,15 @@
}
/**
+ * Get the message flow priority.
+ * @return The message flow priority or null if not set.
+ */
+ public Integer getMessageFlowPriority()
+ {
+ return messageFlowPriority ;
+ }
+
+ /**
* Test for equality.
* @param obj The object to compare with
* @return true if equal, false otherwise.
@@ -204,7 +223,8 @@
(requestResponse == webserviceInfo.isRequestResponse()) &&
objEquals(requestLocation, webserviceInfo.requestLocation) &&
objEquals(responseLocation, webserviceInfo.responseLocation) &&
- (addressing == webserviceInfo.isAddressing())) ;
+ (addressing == webserviceInfo.isAddressing()) &&
+ objEquals(messageFlowPriority, webserviceInfo.messageFlowPriority)) ;
}
return false ;
@@ -217,7 +237,7 @@
public int hashCode()
{
return service.hashCode() ^ objHash(inXsd) ^ objHash(outXsd) ^ objHash(faultXsd) & objHash(description) ^ (requestResponse ? 0xdeafdeaf : 0) ^
- objHash(requestLocation) ^ objHash(responseLocation) ^ (addressing ? 0xfeedfeed : 0);
+ objHash(requestLocation) ^ objHash(responseLocation) ^ (addressing ? 0xfeedfeed : 0) ^ objHash(messageFlowPriority) ;
}
/**
@@ -228,7 +248,7 @@
{
return service + objName("inXsd", inXsd) + objName("outXsd", outXsd) + objName("faultXsd" , faultXsd) + objName("description" , description) +
(requestResponse ? ",requestResponse" : "") + objName("requestLocation", requestLocation) + objName("responseLocation", responseLocation) +
- (addressing ? ", addressing" : "") ;
+ (addressing ? ", addressing" : "") + objName("messageFlowPriority", messageFlowPriority) ;
}
/**
@@ -236,7 +256,7 @@
* @param value The object value or null.
* @return The hashcode.
*/
- private int objHash(final String value)
+ private int objHash(final Object value)
{
return (value == null ? 0 : value.hashCode()) ;
}
@@ -247,7 +267,7 @@
* @param value The object value or null.
* @return The string representation.
*/
- private String objName(final String name, final String value)
+ private String objName(final String name, final Object value)
{
if (value != null)
{
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/XMLBeansModel.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/XMLBeansModel.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers/XMLBeansModel.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,9 +34,11 @@
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractPublisher;
import org.jboss.internal.soa.esb.publish.Publish;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.config.ServiceContract;
import org.jboss.soa.esb.listeners.config.ServicePublisher;
import org.jboss.soa.esb.listeners.config.WebserviceInfo;
@@ -507,8 +509,10 @@
/**
* Get the list of services which require a webservice endpoint.
* @return The list of services.
+ * @throws ConfigurationException
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
final List<WebserviceInfo> endpointServices = new ArrayList<WebserviceInfo>() ;
final Services services = jbossesb.getServices() ;
@@ -527,7 +531,8 @@
final WebserviceInfo webserviceInfo = new WebserviceInfo(
new org.jboss.soa.esb.Service(service.getCategory(), service.getName()),
actions.getInXsd(), actions.getOutXsd(), actions.getFaultXsd(),
- service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep())) ;
+ service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep()),
+ getMessageFlowPriority(service)) ;
endpointServices.add(webserviceInfo) ;
}
}
@@ -585,4 +590,18 @@
}
return serviceContractList;
}
+
+ private Integer getMessageFlowPriority(final Service service)
+ throws ConfigurationException
+ {
+ final List<Property> propertyList = service.getPropertyList() ;
+ for(Property property: propertyList)
+ {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getName()))
+ {
+ return MessageFlowContext.parseMessageFlowPriority(property.getValue()) ;
+ }
+ }
+ return null ;
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers110/XMLBeansModel.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers110/XMLBeansModel.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers110/XMLBeansModel.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,9 +34,11 @@
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractPublisher;
import org.jboss.internal.soa.esb.publish.Publish;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.config.ServiceContract;
import org.jboss.soa.esb.listeners.config.ServicePublisher;
import org.jboss.soa.esb.listeners.config.WebserviceInfo;
@@ -507,8 +509,10 @@
/**
* Get the list of services which require a webservice endpoint.
* @return The list of services.
+ * @throws ConfigurationException
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
final List<WebserviceInfo> endpointServices = new ArrayList<WebserviceInfo>() ;
final Services services = jbossesb.getServices() ;
@@ -528,7 +532,8 @@
new org.jboss.soa.esb.Service(service.getCategory(), service.getName()),
actions.getInXsd(), actions.getOutXsd(), actions.getFaultXsd(),
service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep()),
- actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing()) ;
+ actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing(),
+ getMessageFlowPriority(service)) ;
endpointServices.add(webserviceInfo) ;
}
}
@@ -586,4 +591,18 @@
}
return serviceContractList;
}
+
+ private Integer getMessageFlowPriority(final Service service)
+ throws ConfigurationException
+ {
+ final List<Property> propertyList = service.getPropertyList() ;
+ for(Property property: propertyList)
+ {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getName()))
+ {
+ return MessageFlowContext.parseMessageFlowPriority(property.getValue()) ;
+ }
+ }
+ return null ;
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/XMLBeansModel.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/XMLBeansModel.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers120/XMLBeansModel.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,9 +34,11 @@
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractPublisher;
import org.jboss.internal.soa.esb.publish.Publish;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.config.ServiceContract;
import org.jboss.soa.esb.listeners.config.ServicePublisher;
import org.jboss.soa.esb.listeners.config.WebserviceInfo;
@@ -559,8 +561,10 @@
/**
* Get the list of services which require a webservice endpoint.
* @return The list of services.
+ * @throws ConfigurationException
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
final List<WebserviceInfo> endpointServices = new ArrayList<WebserviceInfo>() ;
final Services services = jbossesb.getServices() ;
@@ -580,7 +584,8 @@
new org.jboss.soa.esb.Service(service.getCategory(), service.getName()),
actions.getInXsd(), actions.getOutXsd(), actions.getFaultXsd(),
service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep()),
- actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing()) ;
+ actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing(),
+ getMessageFlowPriority(service)) ;
endpointServices.add(webserviceInfo) ;
}
}
@@ -668,4 +673,18 @@
}
return null;
}
+
+ private Integer getMessageFlowPriority(final Service service)
+ throws ConfigurationException
+ {
+ final List<Property> propertyList = service.getPropertyList() ;
+ for(Property property: propertyList)
+ {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getName()))
+ {
+ return MessageFlowContext.parseMessageFlowPriority(property.getValue()) ;
+ }
+ }
+ return null ;
+ }
}
\ No newline at end of file
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/XMLBeansModel.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/XMLBeansModel.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers130/XMLBeansModel.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,9 +34,11 @@
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractPublisher;
import org.jboss.internal.soa.esb.publish.Publish;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.config.ServiceContract;
import org.jboss.soa.esb.listeners.config.ServicePublisher;
import org.jboss.soa.esb.listeners.config.WebserviceInfo;
@@ -559,8 +561,10 @@
/**
* Get the list of services which require a webservice endpoint.
* @return The list of services.
+ * @throws ConfigurationException
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
final List<WebserviceInfo> endpointServices = new ArrayList<WebserviceInfo>() ;
final Services services = jbossesb.getServices() ;
@@ -580,7 +584,8 @@
new org.jboss.soa.esb.Service(service.getCategory(), service.getName()),
actions.getInXsd(), actions.getOutXsd(), actions.getFaultXsd(),
service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep()),
- actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing()) ;
+ actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing(),
+ getMessageFlowPriority(service)) ;
endpointServices.add(webserviceInfo) ;
}
}
@@ -668,4 +673,18 @@
}
return null;
}
+
+ private Integer getMessageFlowPriority(final Service service)
+ throws ConfigurationException
+ {
+ final List<Property> propertyList = service.getPropertyList() ;
+ for(Property property: propertyList)
+ {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getName()))
+ {
+ return MessageFlowContext.parseMessageFlowPriority(property.getValue()) ;
+ }
+ }
+ return null ;
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers131/XMLBeansModel.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers131/XMLBeansModel.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/mappers131/XMLBeansModel.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,9 +34,11 @@
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractPublisher;
import org.jboss.internal.soa.esb.publish.Publish;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
+import org.jboss.soa.esb.listeners.ListenerTagNames;
import org.jboss.soa.esb.listeners.config.ServiceContract;
import org.jboss.soa.esb.listeners.config.ServicePublisher;
import org.jboss.soa.esb.listeners.config.WebserviceInfo;
@@ -559,8 +561,10 @@
/**
* Get the list of services which require a webservice endpoint.
* @return The list of services.
+ * @throws ConfigurationException
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
final List<WebserviceInfo> endpointServices = new ArrayList<WebserviceInfo>() ;
final Services services = jbossesb.getServices() ;
@@ -580,7 +584,8 @@
new org.jboss.soa.esb.Service(service.getCategory(), service.getName()),
actions.getInXsd(), actions.getOutXsd(), actions.getFaultXsd(),
service.getDescription(), MepType.REQUEST_RESPONSE.equals(actions.getMep()),
- actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing()) ;
+ actions.getRequestLocation(), actions.getResponseLocation(), actions.getAddressing(),
+ getMessageFlowPriority(service)) ;
endpointServices.add(webserviceInfo) ;
}
}
@@ -668,4 +673,18 @@
}
return null;
}
+
+ private Integer getMessageFlowPriority(final Service service)
+ throws ConfigurationException
+ {
+ final List<Property> propertyList = service.getPropertyList() ;
+ for(Property property: propertyList)
+ {
+ if (ListenerTagNames.MESSAGE_FLOW_PRIORITY.equals(property.getName()))
+ {
+ return MessageFlowContext.parseMessageFlowPriority(property.getValue()) ;
+ }
+ }
+ return null ;
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model101SchemaParser.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model101SchemaParser.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model101SchemaParser.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -154,6 +154,7 @@
* @return The list of services.
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
return model.getWebserviceServices() ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model110SchemaParser.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model110SchemaParser.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model110SchemaParser.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -153,6 +153,7 @@
* @return The list of services.
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
return model.getWebserviceServices() ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model120SchemaParser.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model120SchemaParser.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model120SchemaParser.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -152,6 +152,7 @@
* @return The list of services.
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
return model.getWebserviceServices() ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model130SchemaParser.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model130SchemaParser.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/Model130SchemaParser.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -152,6 +152,7 @@
* @return The list of services.
*/
public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException
{
return model.getWebserviceServices() ;
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/ModelAdapter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/ModelAdapter.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/config/model/ModelAdapter.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -79,7 +79,8 @@
* Get the list of services which require a webservice endpoint.
* @return The list of services.
*/
- public List<WebserviceInfo> getWebserviceServices() ;
+ public List<WebserviceInfo> getWebserviceServices()
+ throws ConfigurationException ;
/**
* Get the list of service contracts.
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/HibernateInterceptor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/HibernateInterceptor.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/HibernateInterceptor.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,6 +34,7 @@
import org.hibernate.EmptyInterceptor;
import org.hibernate.Transaction;
import org.hibernate.type.Type;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.client.ServiceInvoker;
@@ -69,6 +70,10 @@
protected Object m_composer;
protected String m_composerName;
protected ConfigTree m_config;
+ /**
+ * Message flow priority.
+ */
+ private final Integer m_messageFlowPriority ;
private boolean enabled ;
@@ -94,8 +99,8 @@
public HibernateInterceptor(ConfigTree f_config, ArrayList<HibernateEventBean> f_list) throws ManagedLifecycleException {
m_config = f_config;
m_events = f_list;
-
try {
+ m_messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(f_config) ;
m_targetServiceCategory = ListenerUtil.getValue(m_config,
ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG, null);
m_targetServiceName = ListenerUtil.getValue(m_config,
@@ -124,7 +129,9 @@
+ m_targetServiceName + "> not found in registry", mde);
} catch (ManagedLifecycleException ex) {
throw ex;
- }
+ } catch (ConfigurationException ce) {
+ throw new ManagedLifecycleException("Unexpected configuration exception", ce);
+ }
try {
resolveComposerClass();
@@ -207,12 +214,15 @@
Throwable thrown = null;
String text = null;
+ MessageFlowContext.setMessageFlowPriority(m_messageFlowPriority) ;
try {
m_serviceInvoker.deliverAsync(message);
} catch (MessageDeliverException e) {
thrown = e;
text = "ServiceInvoker <" + m_targetServiceCategory + " " + m_targetServiceName+ ">.deliverAsync(Message) FAILED";
- }
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
if (null != thrown) {
m_logger.error(text);
m_logger.debug(text, thrown);
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -45,6 +45,7 @@
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPool;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsConnectionPoolContainer;
import org.jboss.internal.soa.esb.rosetta.pooling.JmsSession;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
@@ -72,11 +73,16 @@
public class JmsGatewayListener extends AbstractThreadedManagedLifecycle {
private String durableSubscriptionName;
+ /**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
public JmsGatewayListener(ConfigTree listenerConfig)
throws ConfigurationException {
super(listenerConfig);
_config = listenerConfig;
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(_config) ;
checkMyParms();
} // __________________________________
@@ -153,61 +159,66 @@
+ " started on thread " + Thread.currentThread().getName());
}
- while (isRunning()) {
- javax.jms.Message msgIn = receiveOne();
-
- if (null != msgIn) {
- try {
- Object obj = _processMethod.invoke(_composer, new Object[] {msgIn});
-
- if (null == obj) {
- _logger.warn("Action class method <"
- + _processMethod.getName()
- + "> returned a null object");
- } else {
- // try to deliverAsync the composed message, using the
- // appropriate courier
- // to the target service
-
- Map<String, Object> params = new HashMap<String, Object>();
-
- params.put(Environment.GATEWAY_CONFIG, _config);
-
- obj = FilterManager.getInstance().doOutputWork((Message) obj, params);
-
- try {
- Message message = (Message) obj;
- _serviceInvoker.deliverAsync(message);
- }
- catch (ClassCastException e) {
- _logger.error("Action class method <"
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try {
+ while (isRunning()) {
+ javax.jms.Message msgIn = receiveOne();
+
+ if (null != msgIn) {
+ try {
+ Object obj = _processMethod.invoke(_composer, new Object[] {msgIn});
+
+ if (null == obj) {
+ _logger.warn("Action class method <"
+ _processMethod.getName()
- + "> returned a non Message object", e);
-
- rollbackJMSTransaction();
+ + "> returned a null object");
+ } else {
+ // try to deliverAsync the composed message, using the
+ // appropriate courier
+ // to the target service
+
+ Map<String, Object> params = new HashMap<String, Object>();
+
+ params.put(Environment.GATEWAY_CONFIG, _config);
+
+ obj = FilterManager.getInstance().doOutputWork((Message) obj, params);
+
+ try {
+ Message message = (Message) obj;
+ _serviceInvoker.deliverAsync(message);
+ }
+ catch (ClassCastException e) {
+ _logger.error("Action class method <"
+ + _processMethod.getName()
+ + "> returned a non Message object", e);
+
+ rollbackJMSTransaction();
+ }
}
+ if (jmsSession.getTransacted()) {
+ jmsSession.commit() ;
+ } else {
+ msgIn.acknowledge() ;
+ }
}
- if (jmsSession.getTransacted()) {
- jmsSession.commit() ;
- } else {
- msgIn.acknowledge() ;
+ catch (InvocationTargetException e) {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ rollbackJMSTransaction();
}
+ catch (IllegalAccessException e) {
+ _logger.error("Problems invoking method <"
+ + _processMethod.getName() + ">", e);
+ rollbackJMSTransaction();
+ }
+ catch (Exception e) {
+ _logger.error("Unexpected problem", e);
+ rollbackJMSTransaction();
+ }
}
- catch (InvocationTargetException e) {
- _logger.error("Problems invoking method <"
- + _processMethod.getName() + ">", e);
- rollbackJMSTransaction();
- }
- catch (IllegalAccessException e) {
- _logger.error("Problems invoking method <"
- + _processMethod.getName() + ">", e);
- rollbackJMSTransaction();
- }
- catch (Exception e) {
- _logger.error("Unexpected problem", e);
- rollbackJMSTransaction();
- }
}
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
}
_logger.debug("run() method of " + this.getClass().getSimpleName()
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/SqlTableGatewayListener.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -48,6 +48,7 @@
import javax.sql.DataSource;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.internal.soa.esb.util.StreamUtils;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.EPR;
@@ -103,12 +104,17 @@
private TransactionStrategy transactionStrategy ;
private boolean transacted;
+ /**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
public SqlTableGatewayListener(ConfigTree config)
throws ConfigurationException {
super(config);
_config = config;
_sleepBetweenPolls = 10000;
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(config) ;
checkMyParms();
} // __________________________________
@@ -163,6 +169,7 @@
+ Thread.currentThread().getName());
}
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
try {
do {
transactionStrategy.begin() ;
@@ -243,6 +250,8 @@
_sleepBetweenPolls));
} catch (final TransactionStrategyException tse) {
_logger.warn("Unexpected transaction strategy exception", tse) ;
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
}
if (_logger.isDebugEnabled()) {
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/CamelGateway.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/CamelGateway.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/CamelGateway.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -234,7 +234,11 @@
for (Class<?> jndiType : jndiTypes) {
typeConverterRegistry.addTypeConverter(jndiType, String.class, jndiTypeConverter);
}
- camelContext.addComponent(JBossESBComponent.JBOSSESB, new JBossESBComponent(getConfig()));
+ try {
+ camelContext.addComponent(JBossESBComponent.JBOSSESB, new JBossESBComponent(getConfig()));
+ } catch (final ConfigurationException ce) {
+ throw new ManagedLifecycleException(ce) ;
+ }
if (logger.isDebugEnabled()) {
try {
StringWriter sw = new StringWriter();
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/JBossESBComponent.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/JBossESBComponent.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/JBossESBComponent.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -27,6 +27,8 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.ProcessorEndpoint;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
+import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -46,9 +48,11 @@
public static final String TIMEOUT = "timeout";
private final ConfigTree config;
+ private final Integer messageFlowPriority;
- public JBossESBComponent(ConfigTree config) {
+ public JBossESBComponent(ConfigTree config) throws ConfigurationException {
this.config = config;
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(config);
}
@Override
@@ -61,7 +65,7 @@
String name = getAndRemoveParameter(parameters, NAME, String.class);
boolean async = getAndRemoveParameter(parameters, ASYNC, Boolean.class, Boolean.FALSE);
long timeout = getAndRemoveParameter(parameters, TIMEOUT, Long.class, 30000L);
- processor = new ServiceProcessor(config, new Service(category, name), async, timeout);
+ processor = new ServiceProcessor(config, new Service(category, name), async, timeout, messageFlowPriority);
}
else
{
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/ServiceProcessor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/ServiceProcessor.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/camel/ServiceProcessor.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -20,9 +20,11 @@
package org.jboss.soa.esb.listeners.gateway.camel;
import org.apache.camel.Exchange;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
/**
@@ -32,29 +34,35 @@
*/
public class ServiceProcessor extends JBossESBProcessor {
- private final Service service;
private final boolean async;
private final long timeout;
+ private final Integer messageFlowPriority;
+ private final ServiceInvoker invoker;
- public ServiceProcessor(ConfigTree config, Service service, boolean async, long timeout) {
+ public ServiceProcessor(ConfigTree config, Service service, boolean async, long timeout, final Integer messageFlowPriority) throws MessageDeliverException {
super(config);
- this.service = service;
this.async = async;
this.timeout = timeout;
+ this.messageFlowPriority = messageFlowPriority;
+ invoker = new ServiceInvoker(service);
}
public void process(Exchange exchange) throws Exception {
org.apache.camel.Message camelMessageIn = exchange.getIn();
Message esbMessageIn = getComposer().compose(camelMessageIn);
- ServiceInvoker invoker = new ServiceInvoker(service);
- if (async) {
- invoker.deliverAsync(esbMessageIn);
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try {
+ if (async) {
+ invoker.deliverAsync(esbMessageIn);
+ }
+ else {
+ Message esbMessageOut = invoker.deliverSync(esbMessageIn, timeout);
+ org.apache.camel.Message camelMessageOut = getComposer().decompose(esbMessageOut, camelMessageIn);
+ exchange.setOut(camelMessageOut);
+ }
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
}
- else {
- Message esbMessageOut = invoker.deliverSync(esbMessageIn, timeout);
- org.apache.camel.Message camelMessageOut = getComposer().decompose(esbMessageOut, camelMessageIn);
- exchange.setOut(camelMessageOut);
- }
}
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/http/HttpGatewayServlet.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/http/HttpGatewayServlet.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/http/HttpGatewayServlet.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -38,6 +38,7 @@
import org.jboss.internal.soa.esb.publish.ContractInfo;
import org.jboss.internal.soa.esb.publish.ContractProvider;
import org.jboss.internal.soa.esb.publish.ContractProviderLifecycleResource;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.internal.soa.esb.util.StreamUtils;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.Service;
@@ -92,9 +93,18 @@
private Map<String, Integer> exceptionMappings;
private int[] allowedPorts = new int[0];
private String deployment ;
+ private Integer messageFlowPriority ;
public void init(ServletConfig config) throws ServletException {
deployment = config.getInitParameter(ListenerTagNames.DEPLOYMENT_NAME_TAG) ;
+ final String messageFlowPriorityParam = config.getInitParameter(ListenerTagNames.MESSAGE_FLOW_PRIORITY) ;
+ if (messageFlowPriorityParam != null) {
+ try {
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(messageFlowPriorityParam) ;
+ } catch (final ConfigurationException ce) {
+ throw new ServletException(ce) ;
+ }
+ }
final ClassLoader origClassLoader = Thread.currentThread().getContextClassLoader() ;
final ClassLoader deploymentClassLoader = LifecycleResourceManager.getSingleton().getClassLoaderForDeployment(deployment) ;
@@ -230,6 +240,7 @@
}
Message outMessage;
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
try {
// Dispatch the message to the action pipeline...
if(!asyncInvoke) {
@@ -279,6 +290,8 @@
} else {
throw new ServletException("Failed to deliver message.", e);
}
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
}
if(outMessage != null) {
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/mina/UdpGatewayListener.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/mina/UdpGatewayListener.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/gateway/mina/UdpGatewayListener.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -23,9 +23,13 @@
import java.net.InetSocketAddress;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoAcceptor;
+import org.apache.mina.core.service.IoHandler;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.DatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -87,6 +91,10 @@
* The {@link Channel} that this gateway is bound to.
*/
private MessageHandler messageHandler;
+ /**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
/**
* Sole constructor.
@@ -102,6 +110,7 @@
serviceCategory = udpConfig.getServiceCategory();
serviceName = udpConfig.getServiceName();
messageHandler = udpConfig.getHandler();
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(config) ;
}
/**
@@ -120,7 +129,7 @@
try
{
- datagramAcceptor.setHandler(messageHandler);
+ datagramAcceptor.setHandler(new MessageFlowPriorityIoAdapter(messageHandler, messageFlowPriority));
datagramAcceptor.bind(socketAddress);
}
catch (final IOException e)
@@ -197,4 +206,63 @@
throw new ManagedLifecycleException(e.getMessage(), e);
}
}
+
+ private static final class MessageFlowPriorityIoAdapter implements IoHandler
+ {
+ private final IoHandler handler ;
+ private final Integer messageFlowPriority ;
+
+ MessageFlowPriorityIoAdapter(final IoHandler handler, final Integer messageFlowPriority)
+ {
+ this.handler = handler ;
+ this.messageFlowPriority = messageFlowPriority ;
+ }
+
+ @Override
+ public void messageReceived(final IoSession session, final Object message)
+ throws Exception
+ {
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try
+ {
+ handler.messageReceived(session, message) ;
+ }
+ finally
+ {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
+ }
+
+ public void exceptionCaught(final IoSession session, final Throwable throwable)
+ throws Exception
+ {
+ handler.exceptionCaught(session, throwable);
+ }
+
+ public void messageSent(final IoSession session, final Object message) throws Exception
+ {
+ handler.messageSent(session, message);
+ }
+
+ public void sessionClosed(final IoSession session) throws Exception
+ {
+ handler.sessionClosed(session);
+ }
+
+ public void sessionCreated(final IoSession session) throws Exception
+ {
+ handler.sessionCreated(session);
+ }
+
+ public void sessionIdle(final IoSession session, final IdleStatus idleStatus)
+ throws Exception
+ {
+ handler.sessionIdle(session, idleStatus);
+ }
+
+ public void sessionOpened(final IoSession session) throws Exception
+ {
+ handler.sessionOpened(session);
+ }
+ }
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/BaseJcaInflow.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/BaseJcaInflow.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/jca/BaseJcaInflow.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -26,6 +26,7 @@
import java.lang.reflect.Method;
import java.util.HashMap;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.Configurable;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -47,12 +48,13 @@
private String adapter;
private boolean isTransacted = true;
protected T bean;
+ private final Integer messageFlowPriority;
@SuppressWarnings("unchecked")
public BaseJcaInflow(ConfigTree config, Class<T> inflowInterface) throws ConfigurationException
{
super(config);
-
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(config);
this.inflowInterface = inflowInterface ;
ConfigTree spec = config.getFirstChild(JcaConstants.ELEMENT_ACTIVATION_CONFIG);
for (ConfigTree configProperty : spec.getChildren(JcaConstants.ELEMENT_PROPERTY))
@@ -202,7 +204,15 @@
{
try
{
- return method.invoke(theBean, args);
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority);
+ try
+ {
+ return method.invoke(theBean, args);
+ }
+ finally
+ {
+ MessageFlowContext.setMessageFlowPriority(null);
+ }
}
catch (IllegalAccessException e)
{
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractScheduledManagedLifecycle.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractScheduledManagedLifecycle.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/lifecycle/AbstractScheduledManagedLifecycle.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -27,6 +27,7 @@
import java.util.Properties;
import org.apache.log4j.Logger;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.helpers.KeyValuePair;
@@ -51,8 +52,12 @@
* The logger for this class.
*/
private static final Logger logger = Logger.getLogger(AbstractScheduledManagedLifecycle.class) ;
-
/**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
+
+ /**
* Construct the threaded managed lifecycle.
* @param config The configuration associated with this instance.
* @throws ConfigurationException for configuration errors during initialisation.
@@ -61,11 +66,16 @@
throws ConfigurationException
{
super(config) ;
-
+ messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(config) ;
final Properties properties = extractProperties(config) ;
final SchedulerJobListener listener = new SchedulerJobListener() {
public void onSchedule() throws SchedulingException {
- AbstractScheduledManagedLifecycle.this.onSchedule() ;
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try {
+ AbstractScheduledManagedLifecycle.this.onSchedule() ;
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
}
} ;
final String scheduleIdRef = config.getAttribute(ListenerTagNames.SCHEDULE_ID_REF) ;
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipeline.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -34,6 +34,7 @@
import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.services.security.SecurityContextPropagator;
import org.jboss.internal.soa.esb.services.security.SecurityContextPropagatorFactory;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.internal.soa.esb.util.XMLHelper;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.actions.ActionLifecycle;
@@ -616,6 +617,13 @@
{
final long start = System.nanoTime();
final Call callDetails = createCallDetails(message);
+
+ final Integer flowPriority = (Integer) message.getContext().removeContext(Environment.MESSAGE_FLOW_PRIORITY) ;
+ MessageFlowContext.setMessageFlowPriority(flowPriority) ;
+ if ((flowPriority != null) && LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Pipeline received message with priority " + flowPriority) ;
+ }
boolean result = false ;
String validationFailure = null ;
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/src/org/jboss/soa/esb/listeners/message/UncomposedMessageDeliveryAdapter.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -20,6 +20,7 @@
package org.jboss.soa.esb.listeners.message;
import org.jboss.internal.soa.esb.assertion.AssertArgument;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.couriers.FaultMessageException;
import org.jboss.soa.esb.helpers.ConfigTree;
@@ -47,6 +48,10 @@
* Message composer.
*/
private MessageComposer composer;
+ /**
+ * Message flow priority.
+ */
+ private final Integer messageFlowPriority ;
/**
* Public constructor.
@@ -59,12 +64,13 @@
* @throws org.jboss.soa.esb.services.registry.RegistryException
* Failed to lookup EPRs for the specified Service.
*/
- public UncomposedMessageDeliveryAdapter(String serviceCategory, String serviceName, MessageComposer composer) throws MessageDeliverException {
+ public UncomposedMessageDeliveryAdapter(String serviceCategory, String serviceName, MessageComposer composer, final Integer messageFlowPriority) throws MessageDeliverException {
AssertArgument.isNotNull(serviceCategory, "serviceCategory");
AssertArgument.isNotNull(serviceName, "serviceName");
AssertArgument.isNotNull(composer, "composer");
serviceInvoker = new org.jboss.soa.esb.client.ServiceInvoker(serviceCategory, serviceName);
this.composer = composer;
+ this.messageFlowPriority = messageFlowPriority ;
}
/**
@@ -104,7 +110,12 @@
Message message = composer.compose(messagePayload);
// Deliver the message...
- return serviceInvoker.deliverSync(message, timeoutMillis);
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try {
+ return serviceInvoker.deliverSync(message, timeoutMillis);
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
}
/**
@@ -118,7 +129,12 @@
AssertArgument.isNotNull(messagePayload, "messagePayload");
Message message = composer.compose(messagePayload);
- serviceInvoker.deliverAsync(message);
+ MessageFlowContext.setMessageFlowPriority(messageFlowPriority) ;
+ try {
+ serviceInvoker.deliverAsync(message);
+ } finally {
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ }
}
/**
@@ -142,6 +158,8 @@
String targetServiceCategory = gatewayConfig.getAttribute(ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG);
String targetServiceName = gatewayConfig.getAttribute(ListenerTagNames.TARGET_SERVICE_NAME_TAG);
String composerClass = gatewayConfig.getAttribute(ListenerTagNames.GATEWAY_COMPOSER_CLASS_TAG);
+
+ final Integer messageFlowPriority = MessageFlowContext.parseMessageFlowPriority(gatewayConfig) ;
if (targetServiceCategory == null || targetServiceCategory.trim().equals("")) {
throw new ConfigurationException("Invalid gateway configuration. '" + ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG + "' not specified.");
@@ -161,7 +179,7 @@
composer.setConfiguration(gatewayConfig);
- return new UncomposedMessageDeliveryAdapter(targetServiceCategory, targetServiceName, composer);
+ return new UncomposedMessageDeliveryAdapter(targetServiceCategory, targetServiceName, composer, messageFlowPriority);
} catch (MessageDeliverException e) {
throw new ConfigurationException("Remoting Listener configuration failed.", e);
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/JmsCourierUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -38,9 +38,12 @@
import javax.jms.QueueSession;
import javax.naming.Context;
+import junit.framework.Assert;
import junit.framework.JUnit4TestAdapter;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.couriers.CourierTransportException;
import org.jboss.soa.esb.helpers.NamingContextPool;
import org.jboss.soa.esb.message.format.MessageFactory;
@@ -81,6 +84,7 @@
}
testEPR = new JMSEpr(JMSEpr.QUEUE_TYPE, QUEUE_NAME, CONNECTION_FACTORY) ;
testEPR.getAddr().addExtension(Context.INITIAL_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
+ MockFailMessageProducer.producerPriority = -1 ;
}
@After
@@ -94,6 +98,7 @@
public void testDelivery()
throws Exception
{
+ MockFailMessageProducer.sendFault = true ;
final JmsCourier courier = new JmsCourier(testEPR) ;
try
{
@@ -116,6 +121,47 @@
catch (final CourierTransportException cte) {} // expected
}
+ @Test
+ public void testDefaultMessagePriority()
+ throws Exception
+ {
+ MockFailMessageProducer.sendFault = false ;
+ final JmsCourier courier = new JmsCourier(testEPR) ;
+
+ MessageFlowContext.setMessageFlowPriority(null) ;
+ courier.deliver(MessageFactory.getInstance().getMessage()) ;
+
+ Assert.assertEquals("Invalid producer priority", Message.DEFAULT_PRIORITY, MockFailMessageProducer.producerPriority) ;
+ }
+
+ @Test
+ public void testHighMessagePriority()
+ throws Exception
+ {
+ MockFailMessageProducer.sendFault = false ;
+ final JmsCourier courier = new JmsCourier(testEPR) ;
+ final int expectedPriority = 9 ;
+
+ MessageFlowContext.setMessageFlowPriority(expectedPriority) ;
+ courier.deliver(MessageFactory.getInstance().getMessage()) ;
+
+ Assert.assertEquals("Invalid producer priority", expectedPriority, MockFailMessageProducer.producerPriority) ;
+ }
+
+ @Test
+ public void testLowMessagePriority()
+ throws Exception
+ {
+ MockFailMessageProducer.sendFault = false ;
+ final JmsCourier courier = new JmsCourier(testEPR) ;
+ final int expectedPriority = 0 ;
+
+ MessageFlowContext.setMessageFlowPriority(expectedPriority) ;
+ courier.deliver(MessageFactory.getInstance().getMessage()) ;
+
+ Assert.assertEquals("Invalid producer priority", expectedPriority, MockFailMessageProducer.producerPriority) ;
+ }
+
private static final class MockQueueConnectionFactory extends QueueConnectionFactoryImpl
{
@Override
@@ -237,6 +283,9 @@
private static final class MockFailMessageProducer implements MessageProducer
{
+ public static boolean sendFault ;
+ public static int producerPriority ;
+
public void close() throws JMSException {}
public int getDeliveryMode()
@@ -278,24 +327,37 @@
public void send(Message arg0)
throws JMSException
{
- throw new JMSException("Deliberate send exception") ;
+ if (sendFault)
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
}
public void send(Destination arg0, Message arg1)
throws JMSException
{
- throw new JMSException("Deliberate send exception") ;
+ if (sendFault)
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
}
public void send(Message arg0, int arg1, int arg2, long arg3)
throws JMSException
{
- throw new JMSException("Deliberate send exception") ;
+ if (sendFault)
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
}
public void send(Destination arg0, Message arg1, int arg2, int arg3, long arg4)
throws JMSException
{
+ if (sendFault)
+ {
+ throw new JMSException("Deliberate send exception") ;
+ }
}
public void setDeliveryMode(int arg0) throws JMSException {}
@@ -304,7 +366,11 @@
public void setDisableMessageTimestamp(boolean arg0) throws JMSException {}
- public void setPriority(int arg0) throws JMSException {}
+ public void setPriority(int arg0)
+ throws JMSException
+ {
+ producerPriority = arg0 ;
+ }
public void setTimeToLive(long arg0) throws JMSException {}
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/couriers/MockCourier.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -24,6 +24,8 @@
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierTimeoutException;
import org.jboss.soa.esb.couriers.TwoWayCourier;
+import org.jboss.soa.esb.message.ByReferenceMessage;
+import org.jboss.soa.esb.message.Context;
import org.jboss.soa.esb.message.Message;
/**
@@ -35,10 +37,12 @@
public boolean deliveryResult = true;
public Message message;
public Message pickupMessage;
+ public Context context;
public boolean deliveryAttempted = false;
public CourierException courierException;
public MalformedEPRException malformedEPRException;
private boolean setPickupMessage;
+ private boolean storeReference ;
public MockCourier(boolean deliveryResult) {
this.deliveryResult = deliveryResult;
@@ -49,6 +53,12 @@
this.setPickupMessage = setPickupMessage;
}
+
+ public MockCourier(boolean deliveryResult, boolean setPickupMessage, final boolean storeReference) {
+ this.deliveryResult = deliveryResult;
+ this.setPickupMessage = setPickupMessage;
+ this.storeReference = storeReference;
+ }
public MockCourier(CourierException courierException) {
this.courierException = courierException;
@@ -69,7 +79,11 @@
if(deliveryResult) {
// Only set the message ref if "successful"...
- this.message = message;
+ if (storeReference) {
+ this.message = ((ByReferenceMessage)message).reference() ;
+ } else {
+ this.message = message;
+ }
}
if (setPickupMessage)
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/webservice/BaseWebServiceUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/webservice/BaseWebServiceUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/webservice/BaseWebServiceUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -64,7 +64,7 @@
final ServiceInvoker serviceInvoker = mock(ServiceInvoker.class);
doThrow(faultException).when(serviceInvoker).deliverSync((Message)anyObject(), anyInt());
- final RequestResponseBaseWebService webService = new RequestResponseBaseWebService("deployment", serviceInvoker, null, null, "action");
+ final RequestResponseBaseWebService webService = new RequestResponseBaseWebService("deployment", serviceInvoker, null, null, "action", null);
final SOAPMessage soapMessage = createSOAPMessageWithPayload();
return webService.invoke(soapMessage);
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GroovyGatewayUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GroovyGatewayUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/GroovyGatewayUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -110,7 +110,7 @@
try {
BasicMessageComposer composer = new BasicMessageComposer();
composer.setConfiguration(getConfig());
- return new UncomposedMessageDeliveryAdapter("x", "y", composer);
+ return new UncomposedMessageDeliveryAdapter("x", "y", composer, null);
} catch (MessageDeliverException e) {
fail(e.getMessage());
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/gateway/JBossRemotingGatewayListenerUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -209,7 +209,7 @@
listener = new JBossRemotingGatewayListener(config) {
protected UncomposedMessageDeliveryAdapter createDeliveryAdapter() throws ConfigurationException {
try {
- return new UncomposedMessageDeliveryAdapter("cat", "servicex", new JBossRemotingMessageComposer()) {
+ return new UncomposedMessageDeliveryAdapter("cat", "servicex", new JBossRemotingMessageComposer(), null) {
@SuppressWarnings("unused")
protected EPR getReplyToAddress(EPR toEpr) throws ConfigurationException {
return epr3;
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipelineUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipelineUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ActionProcessingPipelineUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -679,6 +679,32 @@
checkOrder(MockActionInfo.getDestroyList()) ;
}
+ public void testPriorityPropagation() throws Exception
+ {
+ final ConfigTree configTree = new ConfigTree("parent") ;
+ configTree.setAttribute(ListenerTagNames.MEP_ATTRIBUTE_TAG, ListenerTagNames.MEP_ONE_WAY) ;
+
+ addAction(configTree, MockPriorityProcessor.class.getName(), null, null, null) ;
+
+ final ActionProcessingPipeline pipeline = new ActionProcessingPipeline(configTree) ;
+ pipeline.initialise() ;
+
+ final int expectedPriority = 9 ;
+
+ final Message message = MessageFactory.getInstance().getMessage();
+ message.getContext().setContext(Environment.MESSAGE_FLOW_PRIORITY, Integer.valueOf(expectedPriority)) ;
+
+ final boolean processingResult = pipeline.process(message);
+ assertTrue("Failed to process message", processingResult) ;
+
+ assertNull("Did not clear priority from message", MockPriorityProcessor.getMessagePriority()) ;
+ final Integer contextPriority = MockPriorityProcessor.getContextPriority() ;
+ assertNotNull("No priority context present", contextPriority) ;
+ assertEquals("Priority invalid", expectedPriority, contextPriority.intValue()) ;
+
+ pipeline.destroy() ;
+ }
+
public static ConfigTree addAction(final ConfigTree configTree, final String actionName) {
return addAction(configTree, actionName, null, null, null);
}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MessageDeliveryAdapterUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -83,7 +83,7 @@
composer = new BasicMessageComposer();
composer.setConfiguration(new ConfigTree("config"));
- deliveryAdapter = new UncomposedMessageDeliveryAdapter("cat", "service", composer);
+ deliveryAdapter = new UncomposedMessageDeliveryAdapter("cat", "service", composer, null);
}
@After
public void tearDown() throws Exception {
@@ -124,7 +124,7 @@
public void test_No_EPRs() throws RegistryException, MessageDeliverException {
// Make sure there's no attempt to make a delivery when there's no
// EPRs for the service.
- deliveryAdapter = new UncomposedMessageDeliveryAdapter("x", "y", composer);
+ deliveryAdapter = new UncomposedMessageDeliveryAdapter("x", "y", composer, null);
try {
deliveryAdapter.deliverAsync(payload);
fail("Expected MessageDeliverException");
Added: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MockPriorityProcessor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MockPriorityProcessor.java (rev 0)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/MockPriorityProcessor.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.soa.esb.listeners.message;
+
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
+import org.jboss.soa.esb.actions.ActionProcessingException;
+import org.jboss.soa.esb.common.Environment;
+import org.jboss.soa.esb.helpers.ConfigTree;
+import org.jboss.soa.esb.message.Message;
+
+public class MockPriorityProcessor
+{
+ private static Integer messagePriority;
+ private static Integer contextPriority;
+
+ public MockPriorityProcessor(final ConfigTree tree)
+ {
+ }
+
+ public Message process(Message message) throws ActionProcessingException
+ {
+ messagePriority = (Integer) message.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY) ;
+ contextPriority = MessageFlowContext.getMessageFlowPriority() ;
+ return message ;
+ }
+
+ public static Integer getMessagePriority()
+ {
+ return messagePriority ;
+ }
+
+ public static Integer getContextPriority()
+ {
+ return contextPriority ;
+ }
+}
Modified: labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ServiceInvokerCallUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ServiceInvokerCallUnitTest.java 2012-05-22 10:50:50 UTC (rev 38099)
+++ labs/jbossesb/branches/JBESB_4_11_CP/product/rosetta/tests/src/org/jboss/soa/esb/listeners/message/ServiceInvokerCallUnitTest.java 2012-05-24 00:48:06 UTC (rev 38100)
@@ -20,6 +20,7 @@
package org.jboss.soa.esb.listeners.message;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@@ -30,9 +31,11 @@
import org.jboss.internal.soa.esb.couriers.MockCourier;
import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
import org.jboss.internal.soa.esb.services.registry.MockRegistry;
+import org.jboss.internal.soa.esb.util.MessageFlowContext;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.client.ServiceInvoker;
+import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.junit.After;
@@ -48,9 +51,11 @@
private EPR deliverEPR ;
private EPR failEPR ;
private EPR pickupEPR ;
+ private EPR respondEPR ;
private JMSEpr jmsDeliverEPR ;
private MockCourier deliverCourier ;
private MockCourier failCourier ;
+ private MockCourier respondCourier ;
private Message responseMessage ;
@Before
@@ -64,24 +69,29 @@
failEPR = new JMSEpr(JMSEpr.QUEUE_TYPE, "test:fail", "ConnectionFactory") ;
pickupEPR = new EPR(new URI("test:pickup")) ;
jmsDeliverEPR = new JMSEpr(JMSEpr.QUEUE_TYPE, "deliver", "ConnectionFactory") ;
+ respondEPR = new JMSEpr(JMSEpr.QUEUE_TYPE, "test:respond", "ConnectionFactory") ;
deliverCourier = new MockCourier(true);
failCourier = new MockCourier(false);
+ respondCourier = new MockCourier(true, true, true);
responseMessage = MessageFactory.getInstance().getMessage() ;
deliverCourier.pickupMessage = responseMessage ;
MockRegistry.register("test", "deliver", deliverEPR, deliverCourier);
MockRegistry.register("test", "fail", failEPR, failCourier);
MockRegistry.register("test", "jmsdeliver", jmsDeliverEPR, deliverCourier);
+ MockRegistry.register("test", "respond", respondEPR, respondCourier);
final EPR deadLetterEPR = new EPR(new URI("internal:DLQ")) ;
MockRegistry.register(ServiceInvoker.INTERNAL_SERVICE_CATEGORY, ServiceInvoker.DEAD_LETTER_SERVICE_NAME, deadLetterEPR, deliverCourier);
+ MessageFlowContext.setMessageFlowPriority(null) ;
}
@After
public void tearDown()
throws Exception
{
+ MessageFlowContext.setMessageFlowPriority(null) ;
MockRegistry.uninstall() ;
MockCourierFactory.uninstall() ;
}
@@ -275,6 +285,57 @@
assertEquals("Call ReplyTo EPR", pickupEPR, message.getHeader().getCall().getReplyTo()) ;
}
+ @Test
+ public void testPrioritySyncEPRs()
+ throws Exception
+ {
+ final Message message = MessageFactory.getInstance().getMessage() ;
+ message.getHeader().getCall().setReplyTo(pickupEPR) ;
+
+ final int expectedPriority = 9 ;
+ MessageFlowContext.setMessageFlowPriority(expectedPriority) ;
+
+ final Message pickupMessage = MessageFactory.getInstance().getMessage() ;
+ pickupMessage.getContext().setContext(Environment.MESSAGE_FLOW_PRIORITY, Integer.valueOf(expectedPriority)) ;
+ respondCourier.pickupMessage = pickupMessage ;
+
+ final ServiceInvoker si = new ServiceInvoker("test", "respond") ;
+ final Message resp = si.deliverSync(message, 10) ;
+
+ // should be cleared from response
+ assertNotNull("Message was not delivered", respondCourier.message) ;
+ final Integer priority = (Integer) respondCourier.message.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY) ;
+ assertNotNull("Message priority not delivered to courier", priority) ;
+ assertEquals("Invalid message priority", expectedPriority, priority.intValue()) ;
+
+ assertNull("Message flow context not cleared from original message", message.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY)) ;
+
+ assertNotNull("No response received", resp) ;
+ assertNull("Message flow context not cleared from response message", resp.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY)) ;
+ }
+
+ @Test
+ public void testPriorityAsyncEPRs()
+ throws Exception
+ {
+ final Message message = MessageFactory.getInstance().getMessage() ;
+ message.getHeader().getCall().setReplyTo(pickupEPR) ;
+
+ final int expectedPriority = 9 ;
+ MessageFlowContext.setMessageFlowPriority(expectedPriority) ;
+
+ final ServiceInvoker si = new ServiceInvoker("test", "respond") ;
+ si.deliverAsync(message) ;
+
+ // should be cleared from response
+ assertNotNull("Message was not delivered", respondCourier.message) ;
+ final Integer priority = (Integer) respondCourier.message.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY) ;
+ assertNotNull("Message priority not delivered to courier", priority) ;
+ assertEquals("Invalid message priority", expectedPriority, priority.intValue()) ;
+
+ assertNull("Message flow context not cleared from original message", message.getContext().getContext(Environment.MESSAGE_FLOW_PRIORITY)) ;
+ }
+
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(ServiceInvokerCallUnitTest.class) ;
More information about the jboss-svn-commits
mailing list