Author: richard.opalka(a)jboss.com
Date: 2007-12-12 15:57:26 -0500 (Wed, 12 Dec 2007)
New Revision: 5282
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequence.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
Removed:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequenceImpl.java
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/core/CommonSOAPBinding.java
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceIface.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/jaxws/RMHandler.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
Log:
implemented sequence lifecycle protocol messages on server side
Modified: stack/native/trunk/src/main/java/org/jboss/ws/core/CommonSOAPBinding.java
===================================================================
--- stack/native/trunk/src/main/java/org/jboss/ws/core/CommonSOAPBinding.java 2007-12-12
17:50:58 UTC (rev 5281)
+++ stack/native/trunk/src/main/java/org/jboss/ws/core/CommonSOAPBinding.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -437,16 +437,19 @@
{
QName opQName = opMetaData.getResponseName();
- Name opName = new NameImpl(namespaceRegistry.registerQName(opQName));
- soapBodyElement = new SOAPBodyElementRpc(opName);
- soapBodyElement =
(SOAPBodyElement)soapBody.addChildElement(soapBodyElement);
+ if (false == RMHelper.isRMOperation(opQName)) // RM hack
+ {
+ Name opName = new NameImpl(namespaceRegistry.registerQName(opQName));
+ soapBodyElement = new SOAPBodyElementRpc(opName);
+ soapBodyElement =
(SOAPBodyElement)soapBody.addChildElement(soapBodyElement);
- // Add soap encodingStyle
- if (opMetaData.getUse() == Use.ENCODED)
- {
- String envURI = soapEnvelope.getNamespaceURI();
- String envPrefix = soapEnvelope.getPrefix();
- soapBodyElement.setAttributeNS(envURI, envPrefix +
":encodingStyle", Constants.URI_SOAP11_ENC);
+ // Add soap encodingStyle
+ if (opMetaData.getUse() == Use.ENCODED)
+ {
+ String envURI = soapEnvelope.getNamespaceURI();
+ String envPrefix = soapEnvelope.getPrefix();
+ soapBodyElement.setAttributeNS(envURI, envPrefix +
":encodingStyle", Constants.URI_SOAP11_ENC);
+ }
}
}
Modified: stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -67,7 +67,7 @@
import org.jboss.ws.core.soap.MessageContextAssociation;
import org.jboss.ws.extensions.addressing.AddressingClientUtil;
import org.jboss.ws.extensions.wsrm.RMConstant;
-import org.jboss.ws.extensions.wsrm.RMClientSequenceImpl;
+import org.jboss.ws.extensions.wsrm.RMClientSequence;
import org.jboss.ws.extensions.wsrm.api.RMException;
import org.jboss.ws.extensions.wsrm.common.RMHelper;
import org.jboss.ws.extensions.wsrm.spi.RMConstants;
@@ -105,14 +105,14 @@
private static HandlerType[] HANDLER_TYPES = new HandlerType[] { HandlerType.PRE,
HandlerType.ENDPOINT, HandlerType.POST };
// WS-RM sequence associated with the proxy
- private RMClientSequenceImpl wsrmSequence;
+ private RMClientSequence wsrmSequence;
- public final void setWSRMSequence(RMClientSequenceImpl wsrmSequence)
+ public final void setWSRMSequence(RMClientSequence wsrmSequence)
{
this.wsrmSequence = wsrmSequence;
}
- public final RMClientSequenceImpl getWSRMSequence()
+ public final RMClientSequence getWSRMSequence()
{
return this.wsrmSequence;
}
@@ -513,7 +513,7 @@
try
{
// set up addressing data
- RMClientSequenceImpl candidateSequence = new
RMClientSequenceImpl(addressableClient,
getEndpointMetaData().getConfig().getRMMetaData());
+ RMClientSequence candidateSequence = new RMClientSequence(addressableClient,
getEndpointMetaData().getConfig().getRMMetaData());
String address = getEndpointMetaData().getEndpointAddress();
String action = RMConstant.CREATE_SEQUENCE_WSA_ACTION;
AddressingProperties addressingProps = null;
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequence.java
===================================================================
--- stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequence.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequence.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.BindingProvider;
+import javax.xml.ws.addressing.AddressingBuilder;
+import javax.xml.ws.addressing.AddressingProperties;
+import javax.xml.ws.addressing.JAXWSAConstants;
+
+import org.jboss.logging.Logger;
+import org.jboss.ws.core.jaxws.client.ClientImpl;
+import org.jboss.ws.core.utils.UUIDGenerator;
+import org.jboss.ws.extensions.addressing.AddressingClientUtil;
+import org.jboss.ws.extensions.wsrm.config.RMConfig;
+import org.jboss.ws.extensions.wsrm.api.RMException;
+import org.jboss.ws.extensions.wsrm.spi.RMConstants;
+import org.jboss.ws.extensions.wsrm.spi.RMProvider;
+import org.jboss.ws.extensions.wsrm.spi.protocol.RMIncompleteSequenceBehavior;
+import org.jboss.ws.extensions.wsrm.transport.RMUnassignedMessageListener;
+
+/**
+ * Client side implementation of the RM sequence
+ *
+ * @author richard.opalka(a)jboss.com
+ *
+ * @since Oct 25, 2007
+ */
+@SuppressWarnings("unchecked")
+public final class RMClientSequence implements RMSequenceIface,
RMUnassignedMessageListener
+{
+ private static final Logger logger = Logger.getLogger(RMClientSequence.class);
+ private static final String PATH_PREFIX = "/temporary_listen_address/";
+ private static final RMConstants wsrmConstants = RMProvider.get().getConstants();
+
+ private final RMConfig wsrmConfig;
+ private final boolean addressableClient;
+ private final Set<Long> acknowledgedOutboundMessages = new
TreeSet<Long>();
+ private final Set<Long> receivedInboundMessages = new TreeSet<Long>();
+ private RMIncompleteSequenceBehavior behavior =
RMIncompleteSequenceBehavior.NO_DISCARD;
+ private String incomingSequenceId;
+ private String outgoingSequenceId;
+ private long duration = -1;
+ private long creationTime;
+ private URI backPort;
+ private ClientImpl client;
+ private boolean isFinal;
+ private AtomicBoolean inboundMessageAckRequested = new AtomicBoolean();
+ private AtomicLong messageNumber = new AtomicLong();
+ private AtomicInteger countOfUnassignedMessagesAvailable = new AtomicInteger();
+
+ public RMClientSequence(boolean addrType, RMConfig wsrmConfig)
+ {
+ super();
+ if (wsrmConfig == null)
+ throw new RMException("WS-RM configuration missing");
+ if (wsrmConfig.getBackPortsServer() == null)
+ throw new RMException("WS-RM backports server configuration
missing");
+
+ this.addressableClient = addrType;
+ this.wsrmConfig = wsrmConfig;
+ try
+ {
+ String host = wsrmConfig.getBackPortsServer().getHost();
+ if (host == null)
+ {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ logger.debug("Backports server configuration omits host configuration -
using autodetected " + host);
+ }
+ String port = wsrmConfig.getBackPortsServer().getPort();
+ String path = PATH_PREFIX + UUIDGenerator.generateRandomUUIDString();
+ this.backPort = new URI("http://" + host + ":" + port +
path);
+ }
+ catch (URISyntaxException use)
+ {
+ logger.warn(use);
+ throw new RMException(use.getMessage(), use);
+ }
+ catch (UnknownHostException uhe)
+ {
+ logger.warn(uhe);
+ throw new RMException(uhe.getMessage(), uhe);
+ }
+ }
+
+ public void unassignedMessageReceived()
+ {
+ // we can't use objectLock in the method - possible deadlock
+ this.countOfUnassignedMessagesAvailable.addAndGet(1);
+ logger.debug("Expected sequence expiration in " +
((System.currentTimeMillis() - this.creationTime) / 1000) + "seconds");
+ logger.debug("Unassigned message available in callback handler");
+ }
+
+ public final RMConfig getRMConfig()
+ {
+ return this.wsrmConfig;
+ }
+
+ public final Set<Long> getReceivedInboundMessages()
+ {
+ return this.receivedInboundMessages;
+ }
+
+ public final BindingProvider getBindingProvider()
+ {
+ return (BindingProvider)this.client;
+ }
+
+ public final void setFinal()
+ {
+ this.isFinal = true;
+ logger.debug("Sequence " + this.outgoingSequenceId + " state changed
to final");
+ }
+
+ public final void ackRequested(boolean requested)
+ {
+ this.inboundMessageAckRequested.set(requested);
+ logger.debug("Inbound Sequence: " + this.incomingSequenceId + ", ack
requested. Messages in the queue: " + this.receivedInboundMessages);
+ }
+
+ public final boolean isAckRequested()
+ {
+ return this.inboundMessageAckRequested.get();
+ }
+
+ public final void addReceivedInboundMessage(long messageId)
+ {
+ this.receivedInboundMessages.add(messageId);
+ logger.debug("Inbound Sequence: " + this.incomingSequenceId + ",
received message no. " + messageId);
+ }
+
+ public final void addReceivedOutboundMessage(long messageId)
+ {
+ this.acknowledgedOutboundMessages.add(messageId);
+ logger.debug("Outbound Sequence: " + this.outgoingSequenceId + ",
message no. " + messageId + " acknowledged by server");
+ }
+
+ public final void setOutboundId(String outboundId)
+ {
+ this.outgoingSequenceId = outboundId;
+ }
+
+ public final void setInboundId(String inboundId)
+ {
+ this.incomingSequenceId = inboundId;
+ }
+
+ public final void setClient(ClientImpl client)
+ {
+ this.client = client;
+ }
+
+ public final void setDuration(long duration)
+ {
+ if (duration > 0)
+ {
+ this.creationTime = System.currentTimeMillis();
+ this.duration = duration;
+ }
+ }
+
+ public final long getDuration()
+ {
+ return this.duration;
+ }
+
+ public final URI getBackPort()
+ {
+ // no need for synchronization
+ return (this.addressableClient) ? this.backPort : null;
+ }
+
+ public final long newMessageNumber()
+ {
+ // no need for synchronization
+ return this.messageNumber.incrementAndGet();
+ }
+
+ public final long getLastMessageNumber()
+ {
+ // no need for synchronization
+ return this.messageNumber.get();
+ }
+
+ public final void close() throws RMException
+ {
+ try
+ {
+ sendCloseMessage();
+ sendTerminateMessage();
+ }
+ finally
+ {
+ this.client.setWSRMSequence(null);
+ }
+ }
+
+ /**
+ * Sets up terminated flag to true.
+ */
+ private void sendMessage(String action, QName operationQName, List protocolMessages)
throws RMException
+ {
+ try
+ {
+ // set up addressing properties
+ String address = client.getEndpointMetaData().getEndpointAddress();
+ AddressingProperties props = null;
+ if (this.client.getWSRMSequence().getBackPort() != null)
+ {
+ props = AddressingClientUtil.createDefaultProps(action, address);
+
props.setReplyTo(AddressingBuilder.getAddressingBuilder().newEndpointReference(this.client.getWSRMSequence().getBackPort()));
+ }
+ else
+ {
+ props = AddressingClientUtil.createAnonymousProps(action, address);
+ }
+ // prepare WS-RM request context
+ Map requestContext = client.getBindingProvider().getRequestContext();
+ Map rmRequestContext = (Map)requestContext.get(RMConstant.REQUEST_CONTEXT);
+ if (rmRequestContext == null)
+ {
+ rmRequestContext = new HashMap();
+ }
+ rmRequestContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
+ rmRequestContext.put(RMConstant.SEQUENCE_REFERENCE, this);
+ // set up method invocation context
+ requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND,
props);
+ requestContext.put(RMConstant.REQUEST_CONTEXT, rmRequestContext);
+ // call stub method
+ this.client.invoke(operationQName, new Object[] {},
client.getBindingProvider().getResponseContext());
+ }
+ catch (Exception e)
+ {
+ throw new RMException("Unable to terminate WSRM sequence", e);
+ }
+ }
+
+ public final void sendCloseMessage()
+ {
+ while (this.isAckRequested())
+ {
+ logger.debug("Waiting till all inbound sequence acknowledgements will be
sent");
+ sendSequenceAcknowledgementMessage();
+ }
+ Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
+ wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, false);
+ this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
+ List msgs = new LinkedList();
+ msgs.add(wsrmConstants.getCloseSequenceQName());
+ sendMessage(RMConstant.CLOSE_SEQUENCE_WSA_ACTION,
wsrmConstants.getCloseSequenceQName(), msgs);
+ }
+
+ public final void sendTerminateMessage()
+ {
+ List msgs = new LinkedList();
+ msgs.add(wsrmConstants.getTerminateSequenceQName());
+ if (this.getInboundId() != null)
+ {
+ msgs.add(wsrmConstants.getSequenceAcknowledgementQName());
+ }
+ sendMessage(RMConstant.TERMINATE_SEQUENCE_WSA_ACTION,
wsrmConstants.getTerminateSequenceQName(), msgs);
+ }
+
+ public final void sendSequenceAcknowledgementMessage()
+ {
+ Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
+ wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, true);
+ this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
+ ackRequested(false);
+ List msgs = new LinkedList();
+ msgs.add(wsrmConstants.getSequenceAcknowledgementQName());
+ sendMessage(RMConstant.SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION,
wsrmConstants.getSequenceAcknowledgementQName(), msgs);
+ }
+
+ public final void setBehavior(RMIncompleteSequenceBehavior behavior)
+ {
+ if (behavior != null)
+ {
+ this.behavior = behavior;
+ }
+ }
+
+ public final String getOutboundId()
+ {
+ return outgoingSequenceId;
+ }
+
+ public final String getInboundId()
+ {
+ return incomingSequenceId;
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequence.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequenceImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequenceImpl.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientSequenceImpl.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -1,326 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.extensions.wsrm;
-
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.xml.namespace.QName;
-import javax.xml.ws.BindingProvider;
-import javax.xml.ws.addressing.AddressingBuilder;
-import javax.xml.ws.addressing.AddressingProperties;
-import javax.xml.ws.addressing.JAXWSAConstants;
-
-import org.jboss.logging.Logger;
-import org.jboss.ws.core.jaxws.client.ClientImpl;
-import org.jboss.ws.core.utils.UUIDGenerator;
-import org.jboss.ws.extensions.addressing.AddressingClientUtil;
-import org.jboss.ws.extensions.wsrm.config.RMConfig;
-import org.jboss.ws.extensions.wsrm.api.RMException;
-import org.jboss.ws.extensions.wsrm.spi.RMConstants;
-import org.jboss.ws.extensions.wsrm.spi.RMProvider;
-import org.jboss.ws.extensions.wsrm.spi.protocol.RMIncompleteSequenceBehavior;
-import org.jboss.ws.extensions.wsrm.transport.RMUnassignedMessageListener;
-
-/**
- * Client side implementation of the RM sequence
- *
- * @author richard.opalka(a)jboss.com
- *
- * @since Oct 25, 2007
- */
-@SuppressWarnings("unchecked")
-public final class RMClientSequenceImpl implements RMSequenceIface,
RMUnassignedMessageListener
-{
- private static final Logger logger = Logger.getLogger(RMClientSequenceImpl.class);
- private static final String PATH_PREFIX = "/temporary_listen_address/";
- private static final RMConstants wsrmConstants = RMProvider.get().getConstants();
-
- private final RMConfig wsrmConfig;
- private final boolean addressableClient;
- private final Set<Long> acknowledgedOutboundMessages = new
TreeSet<Long>();
- private final Set<Long> receivedInboundMessages = new TreeSet<Long>();
- private RMIncompleteSequenceBehavior behavior =
RMIncompleteSequenceBehavior.NO_DISCARD;
- private String incomingSequenceId;
- private String outgoingSequenceId;
- private long duration = -1;
- private long creationTime;
- private URI backPort;
- private ClientImpl client;
- private boolean isFinal;
- private AtomicBoolean inboundMessageAckRequested = new AtomicBoolean();
- private AtomicLong messageNumber = new AtomicLong();
- private AtomicInteger countOfUnassignedMessagesAvailable = new AtomicInteger();
-
- public RMClientSequenceImpl(boolean addrType, RMConfig wsrmConfig)
- {
- super();
- if (wsrmConfig == null)
- throw new RMException("WS-RM configuration missing");
- if (wsrmConfig.getBackPortsServer() == null)
- throw new RMException("WS-RM backports server configuration
missing");
-
- this.addressableClient = addrType;
- this.wsrmConfig = wsrmConfig;
- try
- {
- String host = wsrmConfig.getBackPortsServer().getHost();
- if (host == null)
- {
- host = InetAddress.getLocalHost().getCanonicalHostName();
- logger.debug("Backports server configuration omits host configuration -
using autodetected " + host);
- }
- String port = wsrmConfig.getBackPortsServer().getPort();
- String path = PATH_PREFIX + UUIDGenerator.generateRandomUUIDString();
- this.backPort = new URI("http://" + host + ":" + port +
path);
- }
- catch (URISyntaxException use)
- {
- logger.warn(use);
- throw new RMException(use.getMessage(), use);
- }
- catch (UnknownHostException uhe)
- {
- logger.warn(uhe);
- throw new RMException(uhe.getMessage(), uhe);
- }
- }
-
- public void unassignedMessageReceived()
- {
- // we can't use objectLock in the method - possible deadlock
- this.countOfUnassignedMessagesAvailable.addAndGet(1);
- logger.debug("Expected sequence expiration in " +
((System.currentTimeMillis() - this.creationTime) / 1000) + "seconds");
- logger.debug("Unassigned message available in callback handler");
- }
-
- public final RMConfig getRMConfig()
- {
- return this.wsrmConfig;
- }
-
- public final Set<Long> getReceivedInboundMessages()
- {
- return this.receivedInboundMessages;
- }
-
- public final BindingProvider getBindingProvider()
- {
- return (BindingProvider)this.client;
- }
-
- public final void setFinal()
- {
- this.isFinal = true;
- logger.debug("Sequence " + this.outgoingSequenceId + " state changed
to final");
- }
-
- public final void ackRequested(boolean requested)
- {
- this.inboundMessageAckRequested.set(requested);
- logger.debug("Inbound Sequence: " + this.incomingSequenceId + ", ack
requested. Messages in the queue: " + this.receivedInboundMessages);
- }
-
- public final boolean isAckRequested()
- {
- return this.inboundMessageAckRequested.get();
- }
-
- public final void addReceivedInboundMessage(long messageId)
- {
- this.receivedInboundMessages.add(messageId);
- logger.debug("Inbound Sequence: " + this.incomingSequenceId + ",
received message no. " + messageId);
- }
-
- public final void addReceivedOutboundMessage(long messageId)
- {
- this.acknowledgedOutboundMessages.add(messageId);
- logger.debug("Outbound Sequence: " + this.outgoingSequenceId + ",
message no. " + messageId + " acknowledged by server");
- }
-
- public final void setOutboundId(String outboundId)
- {
- this.outgoingSequenceId = outboundId;
- }
-
- public final void setInboundId(String inboundId)
- {
- this.incomingSequenceId = inboundId;
- }
-
- public final void setClient(ClientImpl client)
- {
- this.client = client;
- }
-
- public final void setDuration(long duration)
- {
- if (duration > 0)
- {
- this.creationTime = System.currentTimeMillis();
- this.duration = duration;
- }
- }
-
- public final long getDuration()
- {
- return this.duration;
- }
-
- public final URI getBackPort()
- {
- // no need for synchronization
- return (this.addressableClient) ? this.backPort : null;
- }
-
- public final long newMessageNumber()
- {
- // no need for synchronization
- return this.messageNumber.incrementAndGet();
- }
-
- public final long getLastMessageNumber()
- {
- // no need for synchronization
- return this.messageNumber.get();
- }
-
- public final void close() throws RMException
- {
- try
- {
- sendCloseMessage();
- sendTerminateMessage();
- }
- finally
- {
- this.client.setWSRMSequence(null);
- }
- }
-
- /**
- * Sets up terminated flag to true.
- */
- private void sendMessage(String action, QName operationQName, List protocolMessages)
throws RMException
- {
- try
- {
- // set up addressing properties
- String address = client.getEndpointMetaData().getEndpointAddress();
- AddressingProperties props = null;
- if (this.client.getWSRMSequence().getBackPort() != null)
- {
- props = AddressingClientUtil.createDefaultProps(action, address);
-
props.setReplyTo(AddressingBuilder.getAddressingBuilder().newEndpointReference(this.client.getWSRMSequence().getBackPort()));
- }
- else
- {
- props = AddressingClientUtil.createAnonymousProps(action, address);
- }
- // prepare WS-RM request context
- Map requestContext = client.getBindingProvider().getRequestContext();
- Map rmRequestContext = (Map)requestContext.get(RMConstant.REQUEST_CONTEXT);
- if (rmRequestContext == null)
- {
- rmRequestContext = new HashMap();
- }
- rmRequestContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
- rmRequestContext.put(RMConstant.SEQUENCE_REFERENCE, this);
- // set up method invocation context
- requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND,
props);
- requestContext.put(RMConstant.REQUEST_CONTEXT, rmRequestContext);
- // call stub method
- this.client.invoke(operationQName, new Object[] {},
client.getBindingProvider().getResponseContext());
- }
- catch (Exception e)
- {
- throw new RMException("Unable to terminate WSRM sequence", e);
- }
- }
-
- public final void sendCloseMessage()
- {
- while (this.isAckRequested())
- {
- logger.debug("Waiting till all inbound sequence acknowledgements will be
sent");
- sendSequenceAcknowledgementMessage();
- }
- Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
- wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, false);
- this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
- List msgs = new LinkedList();
- msgs.add(wsrmConstants.getCloseSequenceQName());
- sendMessage(RMConstant.CLOSE_SEQUENCE_WSA_ACTION,
wsrmConstants.getCloseSequenceQName(), msgs);
- }
-
- public final void sendTerminateMessage()
- {
- List msgs = new LinkedList();
- msgs.add(wsrmConstants.getTerminateSequenceQName());
- if (this.getInboundId() != null)
- {
- msgs.add(wsrmConstants.getSequenceAcknowledgementQName());
- }
- sendMessage(RMConstant.TERMINATE_SEQUENCE_WSA_ACTION,
wsrmConstants.getTerminateSequenceQName(), msgs);
- }
-
- public final void sendSequenceAcknowledgementMessage()
- {
- Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
- wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, true);
- this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
- ackRequested(false);
- List msgs = new LinkedList();
- msgs.add(wsrmConstants.getSequenceAcknowledgementQName());
- sendMessage(RMConstant.SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION,
wsrmConstants.getSequenceAcknowledgementQName(), msgs);
- }
-
- public final void setBehavior(RMIncompleteSequenceBehavior behavior)
- {
- if (behavior != null)
- {
- this.behavior = behavior;
- }
- }
-
- public final String getOutboundId()
- {
- return outgoingSequenceId;
- }
-
- public final String getInboundId()
- {
- return incomingSequenceId;
- }
-
-}
Modified: stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -30,9 +30,12 @@
public static final String PROTOCOL_MESSAGES_MAPPING = PREFIX +
".protocolMessagesMapping";
// WS-Addressing related actions
public static final String CREATE_SEQUENCE_WSA_ACTION;
+ public static final String CREATE_SEQUENCE_RESPONSE_WSA_ACTION;
public static final String CLOSE_SEQUENCE_WSA_ACTION;
+ public static final String CLOSE_SEQUENCE_RESPONSE_WSA_ACTION;
public static final String SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION;
public static final String TERMINATE_SEQUENCE_WSA_ACTION;
+ public static final String TERMINATE_SEQUENCE_RESPONSE_WSA_ACTION;
public static final Set<QName> PROTOCOL_OPERATION_QNAMES;
@@ -56,9 +59,12 @@
String namespaceURI = RMProvider.get().getConstants().getNamespaceURI();
PROTOCOL_OPERATION_QNAMES = Collections.unmodifiableSet(temp);
CREATE_SEQUENCE_WSA_ACTION = namespaceURI + "/CreateSequence";
+ CREATE_SEQUENCE_RESPONSE_WSA_ACTION = namespaceURI +
"/CreateSequenceResponse";
CLOSE_SEQUENCE_WSA_ACTION = namespaceURI + "/CloseSequence";
+ CLOSE_SEQUENCE_RESPONSE_WSA_ACTION = namespaceURI +
"/CloseSequenceResponse";
SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION = namespaceURI +
"/SequenceAcknowledgement";
TERMINATE_SEQUENCE_WSA_ACTION = namespaceURI + "/TerminateSequence";
+ TERMINATE_SEQUENCE_RESPONSE_WSA_ACTION = namespaceURI +
"/TerminateSequenceResponse";
}
private RMConstant()
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceIface.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceIface.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceIface.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -38,5 +38,6 @@
long newMessageNumber();
long getLastMessageNumber();
String getInboundId();
+ long getDuration();
Set<Long> getReceivedInboundMessages();
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -23,17 +23,26 @@
import java.util.Date;
import java.util.List;
+import java.util.Map;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.Duration;
import javax.xml.namespace.QName;
+import javax.xml.ws.addressing.AddressingBuilder;
+import javax.xml.ws.addressing.AddressingProperties;
+import javax.xml.ws.addressing.Relationship;
import org.jboss.logging.Logger;
+import org.jboss.ws.extensions.addressing.AddressingClientUtil;
+import org.jboss.ws.extensions.addressing.AddressingPropertiesImpl;
+import org.jboss.ws.extensions.addressing.metadata.AddressingOpMetaExt;
import org.jboss.ws.extensions.wsrm.RMConstant;
-import org.jboss.ws.extensions.wsrm.RMClientSequenceImpl;
+import org.jboss.ws.extensions.wsrm.RMClientSequence;
import org.jboss.ws.extensions.wsrm.api.RMException;
import org.jboss.ws.extensions.wsrm.jaxws.RMHandler;
+import org.jboss.ws.extensions.wsrm.server.RMServerSequence;
+import org.jboss.ws.extensions.wsrm.spi.RMConstants;
import org.jboss.ws.extensions.wsrm.spi.RMProvider;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMAckRequested;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMSequence;
@@ -52,6 +61,7 @@
{
private static final Logger logger = Logger.getLogger(RMHelper.class);
+ private static final RMConstants rmConstants = RMProvider.get().getConstants();
private RMHelper()
{
@@ -72,11 +82,63 @@
}
}
+ public static RMServerSequence getServerSequence(String seqId,
List<RMServerSequence> sequences)
+ {
+ for (RMServerSequence seq : sequences)
+ {
+ if (seq.getInboundId().equals(seqId))
+ {
+ return seq;
+ }
+ }
+
+ return null;
+ }
+
+ public static AddressingProperties createAddressingProperties(AddressingProperties
reqAddrProps, List<QName> rmMessages)
+ {
+ AddressingProperties retVal = null;
+
+ if (rmMessages.contains(rmConstants.getCreateSequenceQName()))
+ {
+ String wsaTo = reqAddrProps.getReplyTo().getAddress().getURI().toString();
+ retVal =
AddressingClientUtil.createDefaultProps(RMConstant.CREATE_SEQUENCE_RESPONSE_WSA_ACTION,
wsaTo);
+ retVal.setMessageID(null);
+ AddressingBuilder addrBuilder = AddressingBuilder.getAddressingBuilder();
+ retVal.setRelatesTo(new Relationship[]
{addrBuilder.newRelationship(addrBuilder.newURI(reqAddrProps.getMessageID().getURI()).getURI())});
+ }
+
+ return retVal;
+ }
+
+ public static boolean isCreateSequence(Map<String, Object> rmMsgContext)
+ {
+ List<QName> protocolMessages =
(List<QName>)rmMsgContext.get(RMConstant.PROTOCOL_MESSAGES);
+ return protocolMessages.contains(rmConstants.getCreateSequenceQName());
+ }
+
+ public static boolean isCloseSequence(Map<String, Object> rmMsgContext)
+ {
+ List<QName> protocolMessages =
(List<QName>)rmMsgContext.get(RMConstant.PROTOCOL_MESSAGES);
+ return protocolMessages.contains(rmConstants.getCloseSequenceQName());
+ }
+
+ public static boolean isTerminateSequence(Map<String, Object> rmMsgContext)
+ {
+ List<QName> protocolMessages =
(List<QName>)rmMsgContext.get(RMConstant.PROTOCOL_MESSAGES);
+ return protocolMessages.contains(rmConstants.getTerminateSequenceQName());
+ }
+
public static Duration stringToDuration(String s)
{
return factory.newDuration(s);
}
+ public static Duration longToDuration(long l)
+ {
+ return factory.newDuration(l);
+ }
+
public static String durationToString(Duration d)
{
return d.toString();
@@ -90,7 +152,7 @@
return d.getTimeInMillis(new Date());
}
- public static void handleSequenceAcknowledgementHeader(RMSequenceAcknowledgement
seqAckHeader, RMClientSequenceImpl sequence)
+ public static void handleSequenceAcknowledgementHeader(RMSequenceAcknowledgement
seqAckHeader, RMClientSequence sequence)
{
String seqId = seqAckHeader.getIdentifier();
if (sequence.getOutboundId().equals(seqId))
@@ -115,7 +177,7 @@
}
}
- public static void handleAckRequestedHeader(RMAckRequested ackReqHeader,
RMClientSequenceImpl sequence)
+ public static void handleAckRequestedHeader(RMAckRequested ackReqHeader,
RMClientSequence sequence)
{
String inboundSeqId = ackReqHeader.getIdentifier();
if (false == sequence.getInboundId().equals(inboundSeqId))
@@ -127,7 +189,7 @@
sequence.ackRequested(true);
}
- public static void handleSequenceHeader(RMSequence seqHeader, RMClientSequenceImpl
sequence)
+ public static void handleSequenceHeader(RMSequence seqHeader, RMClientSequence
sequence)
{
String inboundSeqId = seqHeader.getIdentifier();
if (null == sequence.getInboundId())
@@ -153,24 +215,47 @@
QName createSequenceQName = rmProvider.getConstants().getCreateSequenceQName();
OperationMetaData createSequenceMD = new OperationMetaData(endpointMD,
createSequenceQName, "createSequence");
createSequenceMD.setOneWay(false);
+ // setup addressing related data
+ AddressingOpMetaExt createSequenceAddrExt = new AddressingOpMetaExt(new
AddressingPropertiesImpl().getNamespaceURI());
+ createSequenceAddrExt.setInboundAction(RMConstant.CREATE_SEQUENCE_WSA_ACTION);
+
createSequenceAddrExt.setOutboundAction(RMConstant.CREATE_SEQUENCE_RESPONSE_WSA_ACTION);
+ createSequenceMD.addExtension(createSequenceAddrExt);
+ // register operation metadata with endpoint metadata
endpointMD.addOperation(createSequenceMD);
// register sequenceAcknowledgement method
QName sequenceAcknowledgementQName =
rmProvider.getConstants().getSequenceAcknowledgementQName();
OperationMetaData sequenceAcknowledgementMD = new OperationMetaData(endpointMD,
sequenceAcknowledgementQName, "sequenceAcknowledgement");
sequenceAcknowledgementMD.setOneWay(true);
+ // setup addressing related data
+ AddressingOpMetaExt sequenceAcknowledgementAddrExt = new AddressingOpMetaExt(new
AddressingPropertiesImpl().getNamespaceURI());
+
sequenceAcknowledgementAddrExt.setInboundAction(RMConstant.SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION);
+ sequenceAcknowledgementMD.addExtension(sequenceAcknowledgementAddrExt);
+ // register operation metadata with endpoint metadata
endpointMD.addOperation(sequenceAcknowledgementMD);
// register closeSequence method
QName closeSequenceQName = rmProvider.getConstants().getCloseSequenceQName();
OperationMetaData closeSequenceMD = new OperationMetaData(endpointMD,
closeSequenceQName, "closeSequence");
closeSequenceMD.setOneWay(false);
+ // setup addressing related data
+ AddressingOpMetaExt closeSequenceAddrExt = new AddressingOpMetaExt(new
AddressingPropertiesImpl().getNamespaceURI());
+ closeSequenceAddrExt.setInboundAction(RMConstant.CLOSE_SEQUENCE_WSA_ACTION);
+
closeSequenceAddrExt.setOutboundAction(RMConstant.CLOSE_SEQUENCE_RESPONSE_WSA_ACTION);
+ closeSequenceMD.addExtension(closeSequenceAddrExt);
+ // register operation metadata with endpoint metadata
endpointMD.addOperation(closeSequenceMD);
// register terminateSequence method
QName terminateSequenceQName =
rmProvider.getConstants().getTerminateSequenceQName();
OperationMetaData terminateSequenceMD = new OperationMetaData(endpointMD,
terminateSequenceQName, "terminateSequence");
terminateSequenceMD.setOneWay(false);
+ // setup addressing related data
+ AddressingOpMetaExt terminateSequenceAddrExt = new AddressingOpMetaExt(new
AddressingPropertiesImpl().getNamespaceURI());
+
terminateSequenceAddrExt.setInboundAction(RMConstant.TERMINATE_SEQUENCE_WSA_ACTION);
+
terminateSequenceAddrExt.setOutboundAction(RMConstant.TERMINATE_SEQUENCE_RESPONSE_WSA_ACTION);
+ terminateSequenceMD.addExtension(terminateSequenceAddrExt);
+ // register operation metadata with endpoint metadata
endpointMD.addOperation(terminateSequenceMD);
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/jaxws/RMHandler.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/jaxws/RMHandler.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/jaxws/RMHandler.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -42,6 +42,7 @@
import org.jboss.ws.extensions.wsrm.RMConstant;
import org.jboss.ws.extensions.wsrm.RMSequenceIface;
import org.jboss.ws.extensions.wsrm.api.RMException;
+import org.jboss.ws.extensions.wsrm.common.RMHelper;
import org.jboss.ws.extensions.wsrm.spi.RMConstants;
import org.jboss.ws.extensions.wsrm.spi.RMMessageFactory;
import org.jboss.ws.extensions.wsrm.spi.RMProvider;
@@ -57,6 +58,8 @@
import org.jboss.ws.extensions.wsrm.spi.protocol.RMTerminateSequenceResponse;
/**
+ * TODO: refactor this shared handler code to server/client handlers
+ *
* RM generic JAX-WS handler
*
* @author richard.opalka(a)jboss.com
@@ -89,14 +92,15 @@
if (addrProps == null)
throw new RMException("WS-Addressing properties not found in message
context");
- Map rmRequestContext = (Map)commonMsgContext.get(RMConstant.REQUEST_CONTEXT);
- List<QName> outMsgs =
(List<QName>)rmRequestContext.get(RMConstant.PROTOCOL_MESSAGES);
+ String rmContext = isClientSide() ? RMConstant.REQUEST_CONTEXT :
RMConstant.RESPONSE_CONTEXT;
+ Map rmOutboundContext = (Map)commonMsgContext.get(rmContext);
+ List<QName> outMsgs =
(List<QName>)rmOutboundContext.get(RMConstant.PROTOCOL_MESSAGES);
Map<QName, RMSerializable> data = new HashMap<QName,
RMSerializable>();
String optionalMessageId = (addrProps.getMessageID() != null) ?
addrProps.getMessageID().getURI().toString() : null;
- rmRequestContext.put(RMConstant.WSA_MESSAGE_ID, optionalMessageId);
- rmRequestContext.put(RMConstant.PROTOCOL_MESSAGES_MAPPING, data);
+ rmOutboundContext.put(RMConstant.WSA_MESSAGE_ID, optionalMessageId);
+ rmOutboundContext.put(RMConstant.PROTOCOL_MESSAGES_MAPPING, data);
SOAPMessage soapMessage = ((SOAPMessageContext)commonMsgContext).getMessage();
- RMSequenceIface sequenceImpl =
(RMSequenceIface)rmRequestContext.get(RMConstant.SEQUENCE_REFERENCE);
+ RMSequenceIface sequenceImpl =
(RMSequenceIface)rmOutboundContext.get(RMConstant.SEQUENCE_REFERENCE);
QName msgQName = null;
@@ -115,6 +119,21 @@
}
}
+ if (isServerSide())
+ {
+ msgQName = rmConstants.getCreateSequenceResponseQName();
+ if (outMsgs.contains(msgQName))
+ {
+ // try to serialize CreateSequenceResponse to message
+ RMCreateSequenceResponse createSequenceResponse =
rmFactory.newCreateSequenceResponse();
+ createSequenceResponse.setIdentifier(sequenceImpl.getInboundId());
+
createSequenceResponse.setExpires(RMHelper.longToDuration(sequenceImpl.getDuration()));
+ createSequenceResponse.serializeTo(soapMessage);
+ data.put(msgQName, createSequenceResponse);
+ log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
+ }
+ }
+
msgQName = rmConstants.getSequenceQName();
if (outMsgs.contains(msgQName))
{
@@ -147,7 +166,10 @@
// try to serialize CloseSequence to message
RMCloseSequence closeSequence = rmFactory.newCloseSequence();
closeSequence.setIdentifier(sequenceImpl.getOutboundId());
- closeSequence.setLastMsgNumber(sequenceImpl.getLastMessageNumber());
+ if (sequenceImpl.getLastMessageNumber() > 0)
+ {
+ closeSequence.setLastMsgNumber(sequenceImpl.getLastMessageNumber());
+ }
closeSequence.serializeTo(soapMessage);
data.put(msgQName, closeSequence);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
@@ -161,7 +183,8 @@
{
// try to serialize CloseSequenceResponse to message
RMCloseSequenceResponse closeSequenceResponse =
rmFactory.newCloseSequenceResponse();
- closeSequenceResponse.setIdentifier(sequenceImpl.getOutboundId());
+ closeSequenceResponse.setIdentifier(sequenceImpl.getInboundId());
+ closeSequenceResponse.serializeTo(soapMessage);
data.put(msgQName, closeSequenceResponse);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
}
@@ -175,7 +198,10 @@
// try to serialize TerminateSequence to message
RMTerminateSequence terminateSequence = rmFactory.newTerminateSequence();
terminateSequence.setIdentifier(sequenceImpl.getOutboundId());
- terminateSequence.setLastMsgNumber(sequenceImpl.getLastMessageNumber());
+ if (sequenceImpl.getLastMessageNumber() > 0)
+ {
+ terminateSequence.setLastMsgNumber(sequenceImpl.getLastMessageNumber());
+ }
terminateSequence.serializeTo(soapMessage);
data.put(msgQName, terminateSequence);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
@@ -189,7 +215,7 @@
{
// try to serialize terminateSequenceResponse to message
RMTerminateSequenceResponse terminateSequenceResponse =
rmFactory.newTerminateSequenceResponse();
- terminateSequenceResponse.setIdentifier(sequenceImpl.getOutboundId());
+ terminateSequenceResponse.setIdentifier(sequenceImpl.getInboundId());
terminateSequenceResponse.serializeTo(soapMessage);
data.put(msgQName, terminateSequenceResponse);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
@@ -203,14 +229,21 @@
RMSequenceAcknowledgement sequenceAcknowledgement =
rmFactory.newSequenceAcknowledgement();
sequenceAcknowledgement.setIdentifier(sequenceImpl.getInboundId());
Iterator<Long> receivedInboudMessages =
sequenceImpl.getReceivedInboundMessages().iterator();
- while (receivedInboudMessages.hasNext())
+ if (false == receivedInboudMessages.hasNext())
{
- long messageNo = receivedInboudMessages.next();
- RMSequenceAcknowledgement.RMAcknowledgementRange range =
sequenceAcknowledgement.newAcknowledgementRange();
- range.setLower(messageNo);
- range.setUpper(messageNo);
- sequenceAcknowledgement.addAcknowledgementRange(range);
+ sequenceAcknowledgement.setNone();
}
+ else
+ {
+ while (receivedInboudMessages.hasNext())
+ {
+ long messageNo = receivedInboudMessages.next();
+ RMSequenceAcknowledgement.RMAcknowledgementRange range =
sequenceAcknowledgement.newAcknowledgementRange();
+ range.setLower(messageNo);
+ range.setUpper(messageNo);
+ sequenceAcknowledgement.addAcknowledgementRange(range);
+ }
+ }
sequenceAcknowledgement.serializeTo(soapMessage);
data.put(msgQName, sequenceAcknowledgement);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
@@ -234,8 +267,9 @@
rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES, messages);
Map<QName, RMSerializable> data = new HashMap<QName,
RMSerializable>();
rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES_MAPPING, data);
- msgContext.put(RMConstant.RESPONSE_CONTEXT, rmResponseContext);
- msgContext.setScope(RMConstant.RESPONSE_CONTEXT, Scope.APPLICATION);
+ String rmContext = isClientSide() ? RMConstant.RESPONSE_CONTEXT :
RMConstant.REQUEST_CONTEXT;
+ msgContext.put(rmContext, rmResponseContext);
+ msgContext.setScope(rmContext, Scope.APPLICATION);
if (isServerSide())
{
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -21,6 +21,8 @@
*/
package org.jboss.ws.extensions.wsrm.server;
+import java.util.LinkedList;
+
import org.jboss.ws.extensions.wsrm.common.RMHelper;
import org.jboss.ws.metadata.umdm.ServerEndpointMetaData;
import org.jboss.wsf.spi.deployment.Deployment;
@@ -52,6 +54,7 @@
InvocationHandler origInvHandler = ep.getInvocationHandler();
InvocationHandler wsrmInvHandler = new RMInvocationHandler(origInvHandler);
ep.setInvocationHandler(wsrmInvHandler);
+ ep.addAttachment(RMServerSequence.class, new
LinkedList<RMServerSequence>());
RMHelper.setupRMOperations(sepMetaData);
}
}
@@ -71,6 +74,7 @@
{
RMInvocationHandler rmInvHandler = (RMInvocationHandler)invHandler;
ep.setInvocationHandler(rmInvHandler.getDelegate());
+ ep.removeAttachment(RMServerSequence.class);
}
}
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -21,7 +21,28 @@
*/
package org.jboss.ws.extensions.wsrm.server;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.addressing.AddressingProperties;
+import javax.xml.ws.addressing.JAXWSAConstants;
+
import org.jboss.logging.Logger;
+import org.jboss.util.NotImplementedException;
+import org.jboss.ws.core.CommonMessageContext;
+import org.jboss.ws.core.soap.MessageContextAssociation;
+import org.jboss.ws.extensions.wsrm.RMConstant;
+import org.jboss.ws.extensions.wsrm.api.RMException;
+import org.jboss.ws.extensions.wsrm.api.RMSequence;
+import org.jboss.ws.extensions.wsrm.common.RMHelper;
+import org.jboss.ws.extensions.wsrm.spi.RMConstants;
+import org.jboss.ws.extensions.wsrm.spi.RMProvider;
+import org.jboss.ws.extensions.wsrm.spi.protocol.RMCloseSequence;
+import org.jboss.ws.extensions.wsrm.spi.protocol.RMSerializable;
+import org.jboss.ws.extensions.wsrm.spi.protocol.RMTerminateSequence;
import org.jboss.wsf.spi.deployment.Endpoint;
import org.jboss.wsf.spi.invocation.Invocation;
import org.jboss.wsf.spi.invocation.InvocationHandler;
@@ -37,6 +58,7 @@
{
private static final Logger logger = Logger.getLogger(RMInvocationHandler.class);
+ private static final RMConstants rmConstants = RMProvider.get().getConstants();
private final InvocationHandler delegate;
@@ -63,11 +85,96 @@
{
this.delegate.init(ep);
}
+
+ /**
+ * Do RM staff before endpoint invocation
+ * @param ep endpoint
+ * @param inv invocation
+ * @return true if endpoint have to be called too
+ */
+ private void beforeEndpointInvocation(Endpoint ep, Invocation inv)
+ {
+ CommonMessageContext msgContext = MessageContextAssociation.peekMessageContext();
+ AddressingProperties addrProps =
(AddressingProperties)msgContext.get(JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND);
+ if (addrProps == null)
+ throw new RMException("WS-Addressing properties not found in server
request");
+
+ Map<String, Object> rmReqProps = (Map<String,
Object>)msgContext.get(RMConstant.REQUEST_CONTEXT);
+ if (rmReqProps == null)
+ throw new RMException("WS-RM specific data not found in server
request");
+ Map<String, Object> rmResProps = new HashMap<String, Object>();
+ List<QName> protocolMessages = new LinkedList<QName>();
+ Map<String, Object> rmResponseContext = new HashMap<String, Object>();
+ AddressingProperties respAddrProps = null;
+ List<RMServerSequence> sequences =
(List<RMServerSequence>)ep.getAttachment(RMServerSequence.class);
+ RMServerSequence sequence = null;
+
+ if (RMHelper.isCreateSequence(rmReqProps))
+ {
+ sequence = new RMServerSequence();
+ sequences.add(sequence);
+ protocolMessages.add(rmConstants.getCreateSequenceResponseQName());
+ rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
+ rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
+ msgContext.put(RMConstant.RESPONSE_CONTEXT, rmResponseContext);
+ }
+
+ if (RMHelper.isCloseSequence(rmReqProps))
+ {
+ Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
+ RMCloseSequence payload =
(RMCloseSequence)data.get(rmConstants.getCloseSequenceQName());
+ String seqIdentifier = payload.getIdentifier();
+ sequence = RMHelper.getServerSequence(seqIdentifier, sequences);
+ if (sequence == null)
+ {
+ throw new NotImplementedException("TODO: implement unknown sequence
fault generation");
+ }
+ sequence.close();
+ protocolMessages.add(rmConstants.getCloseSequenceResponseQName());
+ protocolMessages.add(rmConstants.getSequenceAcknowledgementQName());
+ rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
+ rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
+ msgContext.put(RMConstant.RESPONSE_CONTEXT, rmResponseContext);
+ }
+
+ if (RMHelper.isTerminateSequence(rmReqProps))
+ {
+ Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
+ RMTerminateSequence payload =
(RMTerminateSequence)data.get(rmConstants.getTerminateSequenceQName());
+ String seqIdentifier = payload.getIdentifier();
+ sequence = RMHelper.getServerSequence(seqIdentifier, sequences);
+ if (sequence == null)
+ {
+ throw new NotImplementedException("TODO: implement unknown sequence
fault generation");
+ }
+
+ sequences.remove(sequence);
+ protocolMessages.add(rmConstants.getTerminateSequenceResponseQName());
+ protocolMessages.add(rmConstants.getSequenceAcknowledgementQName());
+ rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
+ rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
+ msgContext.put(RMConstant.RESPONSE_CONTEXT, rmResponseContext);
+ }
+
+ // TODO: implement
+ }
+
+ /**
+ * Do RM staff after endpoint invocation
+ * @param ep endpoint
+ * @param inv invocation
+ */
+ private void afterEndpointInvocation(Endpoint ep, Invocation inv)
+ {
+ // TODO: implement
+ }
+
@Override
public final void invoke(Endpoint ep, Invocation inv) throws Exception
{
- // TODO: do WS-RM magic here (such as create, close or terminate sequence
+ beforeEndpointInvocation(ep, inv);
+
if (inv.getJavaMethod() != null)
{
logger.debug("Invoking method: " + inv.getJavaMethod().getName());
@@ -75,8 +182,10 @@
}
else
{
- logger.debug("RM protocol method detected");
+ logger.debug("RM lifecycle protocol method detected");
}
+
+ afterEndpointInvocation(ep, inv);
}
public final InvocationHandler getDelegate()
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -0,0 +1,83 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.server;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.jboss.ws.extensions.addressing.AddressingClientUtil;
+import org.jboss.ws.extensions.wsrm.RMSequenceIface;
+
+/**
+ * Server side implementation of the RM sequence
+ *
+ * @author richard.opalka(a)jboss.com
+ *
+ * @since Dec 12, 2007
+ */
+public class RMServerSequence implements RMSequenceIface
+{
+
+ private final String inboundId = AddressingClientUtil.generateMessageID().toString();
+ private final String outboundId =
AddressingClientUtil.generateMessageID().toString();
+ private final long duration = 10 * 60 * 1000L; // 10 minutes duration
+ private final Set receivedInboundMessages = new TreeSet<Long>();
+ private boolean closed;
+
+ public String getInboundId()
+ {
+ return this.inboundId;
+ }
+
+ public long getLastMessageNumber()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public long getDuration()
+ {
+ return this.duration;
+ }
+
+ public String getOutboundId()
+ {
+ return this.outboundId;
+ }
+
+ public Set<Long> getReceivedInboundMessages()
+ {
+ return this.receivedInboundMessages;
+ }
+
+ public long newMessageNumber()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ public void close()
+ {
+ this.closed = true;
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelTask.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -33,7 +33,7 @@
import org.jboss.remoting.Client;
import org.jboss.remoting.InvokerLocator;
import org.jboss.ws.core.MessageTrace;
-import org.jboss.ws.extensions.wsrm.RMClientSequenceImpl;
+import org.jboss.ws.extensions.wsrm.RMClientSequence;
import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandler;
import org.jboss.ws.extensions.wsrm.transport.backchannel.RMCallbackHandlerFactory;
@@ -78,7 +78,7 @@
if (backPort != null)
{
callbackHandler = RMCallbackHandlerFactory.getCallbackHandler(backPort);
- RMClientSequenceImpl sequence = RMTransportHelper.getSequence(rmRequest);
+ RMClientSequence sequence = RMTransportHelper.getSequence(rmRequest);
if (sequence != null)
{
callbackHandler.addUnassignedMessageListener(sequence);
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-12-12
17:50:58 UTC (rev 5281)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-12-12
20:57:26 UTC (rev 5282)
@@ -7,7 +7,7 @@
import org.jboss.logging.Logger;
import org.jboss.ws.extensions.wsrm.RMConstant;
-import org.jboss.ws.extensions.wsrm.RMClientSequenceImpl;
+import org.jboss.ws.extensions.wsrm.RMClientSequence;
/**
* Utility class heavily used in this transport implementation
@@ -45,9 +45,9 @@
return (Map<String, Object>)invocationCtx.get(REQUEST_CONTEXT);
}
- public static RMClientSequenceImpl getSequence(RMMessage rmRequest)
+ public static RMClientSequence getSequence(RMMessage rmRequest)
{
- return
(RMClientSequenceImpl)getWsrmRequestContext(rmRequest).get(SEQUENCE_REFERENCE);
+ return (RMClientSequence)getWsrmRequestContext(rmRequest).get(SEQUENCE_REFERENCE);
}
public static boolean isOneWayOperation(RMMessage rmRequest)