Author: objectiser
Date: 2012-03-23 18:18:37 -0400 (Fri, 23 Mar 2012)
New Revision: 1501
RIFTSAW-485 - add support for WS-Security with jbossws-native.
+package org.jboss.soa.bpel.runtime.ws;
+import java.net.URL;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.UserTransaction;
+import javax.wsdl.Definition;
+import javax.wsdl.Fault;
+import javax.xml.namespace.QName;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.MimeHeaders;
+import javax.xml.soap.SOAPFault;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.Dispatch;
+import javax.xml.ws.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.utils.DOMUtils;
+import org.apache.ode.utils.Properties;
+import org.jboss.soa.bpel.runtime.engine.EndpointReference;
+import org.jboss.soa.bpel.runtime.engine.PartnerChannel;
+import org.jboss.soa.bpel.runtime.engine.ode.BPELEngineImpl;
+import org.jboss.soa.bpel.runtime.engine.ode.ExecutionEnvironment;
+import org.jboss.soa.bpel.runtime.engine.ode.UDDIClientFactory;
+import org.jboss.soa.bpel.runtime.engine.ode.UDDIRegistration;
+import org.jboss.soa.dsp.EndpointMetaData;
+import org.jboss.soa.dsp.server.ServerConfig;
+import org.jboss.soa.bpel.runtime.JBossDSPFactory;
+import org.jboss.soa.dsp.ws.DOMWriter;
+import org.jboss.soa.dsp.ws.JavaUtils;
+import org.jboss.soa.dsp.ws.SOAPMessageAdapter;
+import org.jboss.soa.dsp.ws.WSDLReference;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+ * Represents a WS invocation.
+ * Delegates to a JAX-WS {@link javax.xml.ws.Dispatch} implementation.
+ *
+ * @author Heiko.Braun <heiko.braun(a)jboss.com>
+ */
+public class WebServiceClient implements PartnerChannel
+ private static final String JAXWS_CLIENT_INITIALIZER = "jaxws.client.initializer.impl";
+ protected final Log log = LogFactory.getLog(getClass());
+ private boolean isInitialized=false;
+ private BPELEngineImpl engine;
+ private UDDIRegistration uddiRegistration = null;
+ private URL wsdlUrl;
+ private String id;
+ private Dispatch<SOAPMessage> dispatcher = null;
+ private EndpointMetaData metaData = null;
+ private Definition wsdlDefinition;
+ private final QName serviceName;
+ private final QName port;
+ private SOAPMessageAdapter messageAdapter;
+ private ExecutionEnvironment executionEnvironment;
+ private java.net.URI baseURI;
+ private long timeout=Properties.DEFAULT_MEX_TIMEOUT;
+ public WebServiceClient(EndpointMetaData metaData, ExecutionEnvironment env,
+ BPELEngineImpl engine, ProcessConf pconf)
+ {
+ this.engine = engine;
+ this.executionEnvironment = env;
+ this.metaData = metaData;
+ this.uddiRegistration = executionEnvironment.getUDDIRegistration();
+ this.id = metaData.getEndpointId();
+ this.serviceName = metaData.getServiceName();
+ this.port = new QName(serviceName.getNamespaceURI(), metaData.getPortName());
+ this.baseURI = pconf.getBaseURI();
+ log.debug("Web Service Client: Base URI="+pconf.getBaseURI());
+ String mextimeout = pconf.getEndpointProperties(serviceName, metaData.getPortName()).get(Properties.PROP_MEX_TIMEOUT);
+ if (mextimeout != null) {
+ try {
+ this.timeout = Long.parseLong(mextimeout);
+ log.debug("Web Service Client: Timeout="+timeout);
+ } catch (NumberFormatException e) {
+ if(log.isWarnEnabled()) log.warn("Mal-formatted Property: ["+ Properties.PROP_MEX_TIMEOUT+"="+mextimeout+"] Default value ("+Properties.DEFAULT_MEX_TIMEOUT+") will be used");
+ }
+ }
+ }
+ private synchronized void initialize() {
+ WSDLReference wsdlReference = new WSDLHelper().createWSDLReference(engine,
+ metaData.getProcessId(), metaData.getServiceName(), metaData.getPortName());
+ wsdlDefinition = wsdlReference.getDefinition();
+ wsdlUrl = wsdlReference.getWsdlURL();
+ this.messageAdapter = new SOAPMessageAdapter(this.wsdlDefinition, serviceName, port.getLocalPart());
+ isInitialized=true;
+ }
+ public EndpointReference getEndpointReference()
+ {
+ return new ClientEndpointReference(this.id);
+ }
+ public void invoke(final PartnerRoleMessageExchange mex)
+ {
+ if (!isInitialized) initialize();
+ log.debug("Invoking dispatcher "+this.id);
+ boolean isTwoWay = mex.getMessageExchangePattern()
+ == org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
+ final Callable executionCallable;
+ if(isTwoWay)
+ {
+ // Defer the invoke until the transaction commits.
+ Scheduler scheduler = executionEnvironment.getScheduler();
+ executionCallable = new TwoWayCallable(mex.getMessageExchangeId());
+ scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+ public void afterCompletion(boolean success)
+ {
+ // If the TX is rolled back, then we don't send the request.
+ if (!success) return;
+ // The invocation must happen in a separate thread, holding on the afterCompletion
+ // blocks other operations that could have been listed there as well.
+ ExecutorService executorService = executionEnvironment.getExecutorService();
+ executorService.submit(executionCallable);
+ }
+ public void beforeCompletion(){}
+ }
+ );
+ mex.replyAsync();
+ }
+ else
+ {
+ // one-way invocation, also defer the invoke until the transaction commits.
+ Scheduler scheduler = executionEnvironment.getScheduler();
+ executionCallable = new OneWayCallable(mex.getMessageExchangeId());
+ scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
+ public void beforeCompletion() {
+ }
+ public void afterCompletion(boolean success) {
+ if (!success) return;
+ ExecutorService executorService = executionEnvironment.getExecutorService();
+ executorService.submit(executionCallable);
+ }
+ });
+ mex.replyOneWayOk();
+ }
+ }
+ private class TwoWayCallable implements Callable<Object> {
+ private String mexId;
+ public TwoWayCallable(String mexId) {
+ this.mexId = mexId;
+ }
+ public Object call() throws Exception {
+ return executionEnvironment.getScheduler().execTransaction(new Callable<Object>(){
+ public Object call() throws Exception {
+ //We need to get a new mex from persistence layer.
+ PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) engine.getBpelServer().getEngine().getMessageExchange(mexId);
+ try
+ {
+ org.apache.ode.bpel.iapi.EndpointReference odeepr=mex.getEndpointReference();
+ javax.xml.ws.EndpointReference epr=null;
+ if (odeepr != null) {
+ if (odeepr instanceof org.apache.ode.bpel.epr.URLEndpoint) {
+ org.apache.ode.bpel.epr.URLEndpoint url=(org.apache.ode.bpel.epr.URLEndpoint)odeepr;
+ javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder builder=
+ new javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder();
+ epr = builder.address(url.getUrl())
+ .serviceName(serviceName)
+ .endpointName(port)
+ .build();
+ if (log.isDebugEnabled()) {
+ log.debug("EPR = " + epr);
+ }
+ }
+ } else if (log.isDebugEnabled()) {
+ log.debug("ODE has no EPR for this message exchange");
+ }
+ // The actual WS invocation
+ Dispatch<SOAPMessage> proxy = getDispatcher(epr, port);
+ setEndpointFromUDDI(proxy);
+ // Create SOAPMessage
+ SOAPMessage soapRequestMessage = MessageFactory.newInstance().createMessage();
+ if(log.isDebugEnabled())
+ log.debug( "ODE outbound message: \n" +DOMWriter.printNode(mex.getRequest().getMessage(), true) );
+ String soapAction=messageAdapter.createSoapRequest(soapRequestMessage,
+ new ODEMessageAdapter(mex.getRequest()), mex.getOperation());
+ if (log.isDebugEnabled()) {
+ log.debug("Riftsaw soap request message: \n" + JavaUtils.getSoapMessageASString(soapRequestMessage));
+ }
+ try {
+ // Set SOAPAction
+ if (soapAction != null) {
+ // Setting soap action using both approaches, as CXF needs one and JBossWS-Native the other
+ proxy.getRequestContext().put(Dispatch.SOAPACTION_URI_PROPERTY, soapAction);
+ MimeHeaders hd = soapRequestMessage.getMimeHeaders();
+ hd.addHeader("SOAPAction", soapAction);
+ } else if (log.isDebugEnabled()) {
+ log.debug("SOAPAction not set");
+ }
+ // Set client side timeout for the invocation
+ proxy.getRequestContext().put("javax.xml.ws.client.receiveTimeout", Long.toString(timeout));
+ SOAPMessage soapResponseMessage = proxy.invoke(soapRequestMessage);
+ if (log.isDebugEnabled()) {
+ log.debug("Riftsaw soap response message: \n" + JavaUtils.getSoapMessageASString(soapResponseMessage));
+ }
+ // Create ODE response
+ Message odeResponse = mex.createMessage(mex.getOperation().getOutput().getMessage().getQName());
+ if(soapResponseMessage.getSOAPBody().hasFault())
+ {
+ // fault handling
+ Document odeMsg = DOMUtils.newDocument();
+ Element odeMsgEl = odeMsg.createElementNS(null, "message");
+ odeMsg.appendChild(odeMsgEl);
+ Fault fault = messageAdapter.parseSoapFault(odeMsgEl, soapResponseMessage, mex.getOperation());
+ handleFault(mex, fault, soapResponseMessage.getSOAPBody().getFault(), odeMsgEl);
+ }
+ else
+ {
+ messageAdapter.parseSoapResponse(new ODEMessageAdapter(odeResponse),
+ soapResponseMessage,mex.getOperation());
+ mex.reply(odeResponse);
+ }
+ if(log.isDebugEnabled())
+ log.debug( "ODE inbound message: \n" +DOMWriter.printNode(odeResponse.getMessage(), true) );
+ } catch(javax.xml.ws.soap.SOAPFaultException fe) {
+ // fault handling
+ Document odeMsg = DOMUtils.newDocument();
+ Element odeMsgEl = odeMsg.createElementNS(null, "message");
+ odeMsg.appendChild(odeMsgEl);
+ if (log.isDebugEnabled()) {
+ log.debug("Riftsaw soap fault: \n" + DOMWriter.printNode(fe.getFault(), true));
+ }
+ Fault fault=messageAdapter.parseSoapFault(odeMsgEl, fe.getFault(),
+ mex.getOperation());
+ handleFault(mex, fault, fe.getFault(), odeMsgEl);
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error("WS invocation failed", e);
+ mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, e.getMessage(), null);
+ }
+ return null;
+ }
+ });
+ }
+ }
+ private class OneWayCallable implements Callable<Object> {
+ private String mexId;
+ public OneWayCallable(String mexId) {
+ this.mexId = mexId;
+ }
+ public Object call() throws Exception {
+ return executionEnvironment.getScheduler().execTransaction(new Callable<Object>(){
+ public Object call() throws Exception {
+ //We need to get a new mex from persistence layer.
+ PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) engine.getBpelServer().getEngine().getMessageExchange(mexId);
+ try
+ {
+ org.apache.ode.bpel.iapi.EndpointReference odeepr=mex.getEndpointReference();
+ javax.xml.ws.EndpointReference epr=null;
+ if (odeepr != null) {
+ if (odeepr instanceof org.apache.ode.bpel.epr.URLEndpoint) {
+ org.apache.ode.bpel.epr.URLEndpoint url=(org.apache.ode.bpel.epr.URLEndpoint)odeepr;
+ javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder builder=
+ new javax.xml.ws.wsaddressing.W3CEndpointReferenceBuilder();
+ epr = builder.address(url.getUrl())
+ .serviceName(serviceName)
+ .endpointName(port)
+ .build();
+ if (log.isDebugEnabled()) {
+ log.debug("EPR = " + epr);
+ }
+ }
+ } else if (log.isDebugEnabled()) {
+ log.debug("ODE has no EPR for this message exchange");
+ }
+ // The actual WS invocation
+ Dispatch<SOAPMessage> proxy = getDispatcher(epr, port);
+ setEndpointFromUDDI(proxy);
+ // Create SOAPMessage
+ SOAPMessage soapRequestMessage = MessageFactory.newInstance().createMessage();
+ if(log.isDebugEnabled())
+ log.debug( "ODE outbound message: \n" +DOMWriter.printNode(mex.getRequest().getMessage(), true) );
+ String soapAction=messageAdapter.createSoapRequest(soapRequestMessage,
+ new ODEMessageAdapter(mex.getRequest()), mex.getOperation());
+ if (log.isDebugEnabled()) {
+ log.debug("Riftsaw soap request message: \n" + JavaUtils.getSoapMessageASString(soapRequestMessage));
+ }
+ // Set SOAPAction
+ if (soapAction != null) {
+ // Setting soap action using both approaches, as CXF needs one and JBossWS-Native the other
+ proxy.getRequestContext().put(Dispatch.SOAPACTION_URI_PROPERTY, soapAction);
+ MimeHeaders hd = soapRequestMessage.getMimeHeaders();
+ hd.addHeader("SOAPAction", soapAction);
+ } else if (log.isDebugEnabled()) {
+ log.debug("SOAPAction not set");
+ }
+ proxy.invokeOneWay(soapRequestMessage);
+ }
+ catch (Throwable e)
+ {
+ log.error("WS invocation failed", e);
+ mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, e.getMessage(), null);
+ }
+ return null;
+ }
+ });
+ }
+ }
+ private void handleFault(PartnerRoleMessageExchange mex, Fault fault, SOAPFault soapFault,
+ Element odeMsgEl) {
+ if (fault != null)
+ {
+ if (log.isWarnEnabled())
+ log.warn("Fault response: faultName="
+ + fault.getName() + " faultType=" + fault.getMessage().getQName()
+ + "\n" + DOMWriter.printNode(odeMsgEl, true));
+ QName faultType = fault.getMessage().getQName();
+ QName faultName = new QName(wsdlDefinition.getTargetNamespace(), fault.getName());
+ Message response = mex.createMessage(faultType);
+ response.setMessage(odeMsgEl);
+ mex.replyWithFault(faultName, response);
+ }
+ else
+ {
+ if (log.isWarnEnabled())
+ log.warn("Fault response: faultType=(unknown)");
+ mex.replyWithFailure(MessageExchange.FailureType.OTHER, "Unspecified", soapFault.getDetail());
+ }
+ }
+ private UserTransaction getUserTransaction()
+ throws NamingException
+ {
+ InitialContext ctx = new InitialContext();
+ UserTransaction tx = (UserTransaction)ctx.lookup("UserTransaction");
+ return tx;
+ }
+ public Element invoke(String operation, Element mesg) throws Exception
+ {
+ throw new RuntimeException("Not implemented. Should be removed form interface");
+ }
+ private synchronized Dispatch<SOAPMessage> getDispatcher(javax.xml.ws.EndpointReference epr, QName portName)
+ {
+ if(null==dispatcher) {
+ log.debug("Creating Dispatcher ("+this.id+") on " + wsdlUrl + ": "+serviceName);
+ JAXWSInitializer initializer=initializeStack(portName);
+ Service service = Service.create(this.wsdlUrl, serviceName);
+ if (epr != null) {
+ dispatcher = service.createDispatch(
+ epr,
+ SOAPMessage.class,
+ Service.Mode.MESSAGE,
+ new javax.xml.ws.soap.AddressingFeature()
+ );
+ } else {
+ dispatcher = service.createDispatch(
+ portName,
+ SOAPMessage.class,
+ Service.Mode.MESSAGE
+ );
+ }
+ if (initializer != null) {
+ initializer.serviceCreated(this.serviceName, portName, this.baseURI, dispatcher);
+ }
+ }
+ return dispatcher;
+ }
+ private synchronized JAXWSInitializer initializeStack(QName portName) {
+ JAXWSInitializer initializer=null;
+ String impl=this.executionEnvironment.getOdeConfig().getProperty(JAXWS_CLIENT_INITIALIZER);
+ if (impl != null && impl.trim().length() > 0) {
+ try {
+ Class<?> cls=Class.forName(impl);
+ initializer = (JAXWSInitializer)cls.newInstance();
+ } catch(Exception e) {
+ log.error("Unable to obtain JAXWS Initializer class '"+impl+"'", e);
+ }
+ if (initializer != null) {
+ initializer.initializeStack(this.serviceName, portName, this.baseURI);
+ }
+ } else {
+ log.debug("JAXWS Client Initializer not defined");
+ }
+ return(initializer);
+ }
+ private void setEndpointFromUDDI(Dispatch<SOAPMessage> proxy) {
+ if (uddiRegistration!=null) {
+ String endpoint = uddiRegistration.lookupEndpoint(serviceName, port.getLocalPart());
+ if (endpoint!=null) {
+ proxy.getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, endpoint);
+ } else {
+ log.debug("Could not find endpoint in UDDI registry for service " + serviceName.getLocalPart());
+ }
+ }
+ }
+ public String toString()
+ {
+ return "WebServiceClient {service="+serviceName+",port="+port+"}";
+ }
