Author: richard.opalka(a)jboss.com
Date: 2007-11-30 16:50:09 -0500 (Fri, 30 Nov 2007)
New Revision: 5161
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/core/CommonClient.java
stack/native/trunk/src/main/java/org/jboss/ws/core/client/RemotingConnectionImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientHandler.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/api/RMProvider.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
stack/native/trunk/src/main/java/org/jboss/ws/metadata/builder/jaxws/JAXWSClientMetaDataBuilder.java
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java
stack/native/trunk/src/test/resources/jaxws/wsrm/emulator/reqres/config/req-res-service-config.xml
Log:
adding support for sequence acknowledgement
Modified: stack/native/trunk/src/main/java/org/jboss/ws/core/CommonClient.java
===================================================================
--- stack/native/trunk/src/main/java/org/jboss/ws/core/CommonClient.java 2007-11-30
17:28:08 UTC (rev 5160)
+++ stack/native/trunk/src/main/java/org/jboss/ws/core/CommonClient.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -355,7 +355,9 @@
// Get the return object
Object retObj = null;
boolean isWsrmMessage = msgContext.get(RMConstant.REQUEST_CONTEXT) != null;
- if ((oneway == false && handlerPass) || isWsrmMessage)
+ Boolean wsrmOneWay = (Boolean)((Map<String,
Object>)msgContext.get(RMConstant.REQUEST_CONTEXT)).get(RMConstant.ONE_WAY_OPERATION);
+ wsrmOneWay = wsrmOneWay == null ? Boolean.FALSE : wsrmOneWay.booleanValue();
+ if ((oneway == false && handlerPass) || (wsrmOneWay == false))
{
// Verify
if (binding instanceof CommonSOAPBinding)
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/core/client/RemotingConnectionImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/core/client/RemotingConnectionImpl.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/core/client/RemotingConnectionImpl.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -196,7 +196,7 @@
{
if (RMTransportHelper.isRMMessage(callProps))
{
- RMMetadata rmMetadata = new RMMetadata(targetAddress, oneway, marshaller,
unmarshaller, callProps, metadata, clientConfig);
+ RMMetadata rmMetadata = new RMMetadata(targetAddress, marshaller,
unmarshaller, callProps, metadata, clientConfig);
return RM_CHANNEL.send(reqMessage, rmMetadata);
}
else
Modified: stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/core/jaxws/client/ClientImpl.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -23,7 +23,6 @@
// $Id$
-import java.net.URI;
import java.rmi.RemoteException;
import java.util.HashMap;
import java.util.HashSet;
@@ -73,7 +72,6 @@
import org.jboss.ws.extensions.wsrm.RMSequenceImpl;
import org.jboss.ws.extensions.wsrm.api.RMAddressingType;
import org.jboss.ws.extensions.wsrm.api.RMException;
-import org.jboss.ws.extensions.wsrm.api.RMSequenceType;
import org.jboss.ws.extensions.wsrm.common.RMHelper;
import org.jboss.ws.extensions.wsrm.spi.RMConstants;
import org.jboss.ws.extensions.wsrm.spi.RMProvider;
@@ -270,7 +268,7 @@
public Object invoke(QName opName, Object[] args, Map<String, Object>
resContext) throws RemoteException
{
this.wsrmLock.lock();
-
+
try
{
// Associate a message context with the current thread
@@ -339,23 +337,26 @@
if (RMConstant.PROTOCOL_OPERATION_QNAMES.contains(opName) == false)
{
Map<String, Object> wsrmResCtx = (Map<String, Object>)
msgContext.get(RMConstant.RESPONSE_CONTEXT);
- RMConstants wsrmConstants = RMProvider.get().getConstants();
- Map<QName, RMSerializable> mapping = (Map<QName,
RMSerializable>)wsrmResCtx.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
- QName seqAck = wsrmConstants.getSequenceAcknowledgementQName();
- if (mapping.keySet().contains(seqAck))
+ if (wsrmResCtx != null)
{
-
RMHelper.handleSequenceAcknowledgementHeader((RMSequenceAcknowledgement)mapping.get(seqAck),
this.wsrmSequence);
+ RMConstants wsrmConstants = RMProvider.get().getConstants();
+ Map<QName, RMSerializable> mapping = (Map<QName,
RMSerializable>)wsrmResCtx.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
+ QName seq = wsrmConstants.getSequenceQName();
+ if (mapping.keySet().contains(seq))
+ {
+ RMHelper.handleSequenceHeader((RMSequence)mapping.get(seq),
this.wsrmSequence);
+ }
+ QName seqAck = wsrmConstants.getSequenceAcknowledgementQName();
+ if (mapping.keySet().contains(seqAck))
+ {
+
RMHelper.handleSequenceAcknowledgementHeader((RMSequenceAcknowledgement)mapping.get(seqAck),
this.wsrmSequence);
+ }
+ QName ackReq = wsrmConstants.getAckRequestedQName();
+ if (mapping.keySet().contains(ackReq))
+ {
+
RMHelper.handleAckRequestedHeader((RMAckRequested)mapping.get(ackReq),
this.wsrmSequence);
+ }
}
- QName seq = wsrmConstants.getSequenceQName();
- if (mapping.keySet().contains(seq))
- {
- RMHelper.handleSequenceHeader((RMSequence)mapping.get(seq),
this.wsrmSequence);
- }
- QName ackReq = wsrmConstants.getAckRequestedQName();
- if (mapping.keySet().contains(ackReq))
- {
-
RMHelper.handleAckRequestedHeader((RMAckRequested)mapping.get(ackReq),
this.wsrmSequence);
- }
}
}
}
@@ -526,22 +527,20 @@
// WS-RM support //
///////////////////
@SuppressWarnings("unchecked")
- public org.jboss.ws.extensions.wsrm.api.RMSequence createSequence(RMAddressingType
addrType, RMSequenceType seqType) throws RMException
+ public org.jboss.ws.extensions.wsrm.api.RMSequence createSequence(RMAddressingType
addrType) throws RMException
{
this.getWSRMLock().lock();
try
{
if (this.wsrmSequence != null)
throw new IllegalStateException("Sequence already registered with proxy
instance");
- if (seqType == null)
- throw new IllegalArgumentException("Sequence type cannot be
null");
if (addrType == null)
throw new IllegalArgumentException("Addressing type cannot be
null");
try
{
// set up addressing data
- RMSequenceImpl candidateSequence = new RMSequenceImpl(addrType, seqType,
getEndpointMetaData().getConfig().getRMMetaData());
+ RMSequenceImpl candidateSequence = new RMSequenceImpl(addrType,
getEndpointMetaData().getConfig().getRMMetaData());
String address = getEndpointMetaData().getEndpointAddress();
String action = RMConstant.CREATE_SEQUENCE_WSA_ACTION;
AddressingProperties addressingProps = null;
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientHandler.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientHandler.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMClientHandler.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -22,6 +22,7 @@
package org.jboss.ws.extensions.wsrm;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -167,11 +168,31 @@
// try to serialize terminateSequenceResponse to message
RMTerminateSequenceResponse terminateSequenceResponse =
rmFactory.newTerminateSequenceResponse();
terminateSequenceResponse.setIdentifier(sequenceImpl.getOutboundId());
+ terminateSequenceResponse.serializeTo(soapMessage);
data.put(msgQName, terminateSequenceResponse);
log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
}
- // TODO: implement SequenceAcknowledgement handler part
+ msgQName = rmConstants.getSequenceAcknowledgementQName();
+ if (outMsgs.contains(msgQName))
+ {
+ // try to serialize SequenceAcknowledgement to message
+ RMSequenceAcknowledgement sequenceAcknowledgement =
rmFactory.newSequenceAcknowledgement();
+ sequenceAcknowledgement.setIdentifier(sequenceImpl.getInboundId());
+ Iterator<Long> receivedInboudMessages =
sequenceImpl.getReceivedInboundMessages().iterator();
+ while (receivedInboudMessages.hasNext())
+ {
+ long messageNo = receivedInboudMessages.next();
+ RMSequenceAcknowledgement.RMAcknowledgementRange range =
sequenceAcknowledgement.newAcknowledgementRange();
+ range.setLower(messageNo);
+ range.setUpper(messageNo);
+ sequenceAcknowledgement.addAcknowledgementRange(range);
+ }
+ sequenceAcknowledgement.serializeTo(soapMessage);
+ data.put(msgQName, sequenceAcknowledgement);
+ log.debug(msgQName.getLocalPart() + " WSRM message was serialized to
payload");
+ }
+
// TODO: implement SequenceFault serialization
return true;
Modified: stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMConstant.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -45,7 +45,7 @@
RMConstants constants = RMProvider.get().getConstants();
temp.add(constants.getSequenceQName());
temp.add(constants.getSequenceFaultQName());
- temp.add(constants.getAcknowledgementRangeQName());
+ temp.add(constants.getSequenceAcknowledgementQName());
temp.add(constants.getAckRequestedQName());
temp.add(constants.getCreateSequenceQName());
temp.add(constants.getCreateSequenceResponseQName());
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceImpl.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -23,6 +23,7 @@
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -32,10 +33,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.xml.namespace.QName;
+import javax.xml.ws.BindingProvider;
import javax.xml.ws.addressing.AddressingBuilder;
import javax.xml.ws.addressing.AddressingProperties;
import javax.xml.ws.addressing.JAXWSAConstants;
@@ -43,11 +43,10 @@
import org.jboss.logging.Logger;
import org.jboss.ws.core.jaxws.client.ClientImpl;
import org.jboss.ws.extensions.addressing.AddressingClientUtil;
+import org.jboss.ws.extensions.wsrm.config.RMConfig;
import org.jboss.ws.extensions.wsrm.api.RMAddressingType;
import org.jboss.ws.extensions.wsrm.api.RMException;
import org.jboss.ws.extensions.wsrm.api.RMSequence;
-import org.jboss.ws.extensions.wsrm.api.RMSequenceType;
-import org.jboss.ws.extensions.wsrm.config.RMConfig;
import org.jboss.ws.extensions.wsrm.spi.RMConstants;
import org.jboss.ws.extensions.wsrm.spi.RMProvider;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMIncompleteSequenceBehavior;
@@ -68,7 +67,6 @@
private static final RMConstants wsrmConstants = RMProvider.get().getConstants();
private final RMConfig wsrmConfig;
- private final RMSequenceType sequenceType;
private final RMAddressingType addrType;
private final Set<Long> acknowledgedOutboundMessages = new
TreeSet<Long>();
private final Set<Long> receivedInboundMessages = new TreeSet<Long>();
@@ -85,7 +83,7 @@
private boolean isFinal;
private boolean inboundMessageAckRequested;
private AtomicLong messageNumber = new AtomicLong();
- private final Lock objectLock = new ReentrantLock();
+ private final Object lock = new Object();
private AtomicInteger countOfUnassignedMessagesAvailable = new AtomicInteger();
public void unassignedMessageReceived()
@@ -96,14 +94,13 @@
logger.debug("Unassigned message available in callback handler");
}
- public RMSequenceImpl(RMAddressingType addrType, RMSequenceType sequenceType, RMConfig
wsrmConfig)
+ public RMSequenceImpl(RMAddressingType addrType, RMConfig wsrmConfig)
{
super();
- if ((addrType == null) || (sequenceType == null) || (wsrmConfig == null))
+ if ((addrType == null) || (wsrmConfig == null))
throw new IllegalArgumentException();
this.addrType = addrType;
- this.sequenceType = sequenceType;
this.wsrmConfig = wsrmConfig;
try
{
@@ -115,135 +112,132 @@
}
}
+ public final Set<Long> getReceivedInboundMessages()
+ {
+ synchronized (lock)
+ {
+ return Collections.unmodifiableSet(this.receivedInboundMessages);
+ }
+ }
+
+ public final BindingProvider getBindingProvider()
+ {
+ synchronized (lock)
+ {
+ return (BindingProvider)this.client;
+ }
+ }
+
public final void setFinal()
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.isFinal = true;
logger.debug("Sequence " + this.outgoingSequenceId + " state
changed to final");
}
- finally
+ }
+
+ public final void ackRequested(boolean requested)
+ {
+ synchronized (lock)
{
- this.objectLock.unlock();
+ this.inboundMessageAckRequested = requested;
+ logger.debug("Inbound Sequence: " + this.incomingSequenceId + ",
ack requested. Messages in the queue: " + this.receivedInboundMessages);
}
}
- public final void ackRequested()
+ public final boolean isAckRequested()
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
- this.inboundMessageAckRequested = true;
- logger.debug("Sequence " + this.incomingSequenceId + " ack
requested. Messages in the queue: " + this.receivedInboundMessages);
+ return this.inboundMessageAckRequested;
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void addReceivedInboundMessage(long messageId)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.receivedInboundMessages.add(messageId);
logger.debug("Inbound Sequence: " + this.incomingSequenceId + ",
received message no. " + messageId);
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void addReceivedMessage(long messageId)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.acknowledgedOutboundMessages.add(messageId);
- logger.debug("Message no. " + messageId + " of sequence: " +
this.outgoingSequenceId + " acknowledged by server");
+ logger.debug("Outbound Sequence: " + this.outgoingSequenceId + ",
message no. " + messageId + " acknowledged by server");
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void setOutboundId(String outboundId)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.outgoingSequenceId = outboundId;
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void setInboundId(String inboundId)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.incomingSequenceId = inboundId;
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void setClient(ClientImpl client)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.client = client;
RMSequenceManager.getInstance().register(this);
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void setDuration(long duration)
{
- if (duration > 0)
+ synchronized (lock)
{
- this.creationTime = System.currentTimeMillis();
- this.duration = duration;
+ if (duration > 0)
+ {
+ this.creationTime = System.currentTimeMillis();
+ this.duration = duration;
+ }
}
}
public final long getDuration()
{
- return -1L;
+ synchronized (lock)
+ {
+ return this.duration;
+ }
}
public final URI getBackPort()
{
+ // no need for synchronization
return (this.addrType == RMAddressingType.ADDRESSABLE) ? this.backPort : null;
}
public final long newMessageNumber()
{
+ // no need for synchronization
return this.messageNumber.incrementAndGet();
}
public final long getLastMessageNumber()
{
+ // no need for synchronization
return this.messageNumber.get();
}
public final void discard() throws RMException
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
this.client.getWSRMLock().lock();
try
@@ -257,23 +251,17 @@
this.client.getWSRMLock().unlock();
}
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final void close() throws RMException
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
if (this.terminated)
return;
this.terminated = true;
- client.getWSRMLock().lock();
try
{
sendCloseMessage();
@@ -281,20 +269,17 @@
}
finally
{
+ client.getWSRMLock().lock();
this.client.setWSRMSequence(null); // TODO: do not remove this
this.client.getWSRMLock().unlock();
}
- }
- finally
- {
- this.objectLock.unlock();
}
}
/**
* Sets up terminated flag to true.
*/
- public final void sendMessage(String action, QName operationQName) throws RMException
+ private void sendMessage(String action, QName operationQName) throws RMException
{
try
{
@@ -311,18 +296,23 @@
props = AddressingClientUtil.createAnonymousProps(action, address);
}
// prepare WS-RM request context
- Map rmRequestContext = new HashMap();
+ Map requestContext = client.getBindingProvider().getRequestContext();
+ Map rmRequestContext = (Map)requestContext.get(RMConstant.REQUEST_CONTEXT);
+ if (rmRequestContext == null)
+ {
+ rmRequestContext = new HashMap();
+ }
List outMsgs = new LinkedList();
outMsgs.add(operationQName);
rmRequestContext.put(RMConstant.PROTOCOL_MESSAGES, outMsgs);
rmRequestContext.put(RMConstant.SEQUENCE_REFERENCE, this);
// set up method invocation context
- Map requestContext = client.getBindingProvider().getRequestContext();
requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES_OUTBOUND,
props);
requestContext.put(RMConstant.REQUEST_CONTEXT, rmRequestContext);
+ requestContext.put(RMConstant.REQUEST_CONTEXT, rmRequestContext);
// call stub method
this.client.invoke(operationQName, new Object[] {},
client.getBindingProvider().getResponseContext());
- RMSequenceManager.getInstance().unregister(this);
+ //RMSequenceManager.getInstance().unregister(this); // TODO: each sequence will
be unregistered by sequence manager
}
catch (Exception e)
{
@@ -330,35 +320,49 @@
}
}
- private void sendCloseMessage()
+ public final void sendCloseMessage()
{
+ /*
+ synchronized (lock)
+ {
+ while (this.isAckRequested())
+ {
+ try
+ {
+ logger.debug("Waiting till all inbound sequence acknowledgements will
be sent");
+ lock.wait(100);
+ }
+ catch (InterruptedException ie)
+ {
+ logger.warn(ie.getMessage(), ie);
+ }
+ }
+ }*/
+ Map<String, Object> wsrmReqCtx = new HashMap<String, Object>();
+ wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, false);
+ this.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
sendMessage(RMConstant.CLOSE_SEQUENCE_WSA_ACTION,
wsrmConstants.getCloseSequenceQName());
}
- private void sendTerminateMessage()
+ public final void sendTerminateMessage()
{
sendMessage(RMConstant.TERMINATE_SEQUENCE_WSA_ACTION,
wsrmConstants.getTerminateSequenceQName());
}
- private void sendSequenceAcknowledgementMessage()
+ public final void sendSequenceAcknowledgementMessage()
{
sendMessage(RMConstant.SEQUENCE_ACKNOWLEDGEMENT_WSA_ACTION,
wsrmConstants.getSequenceAcknowledgementQName());
}
public final void setBehavior(RMIncompleteSequenceBehavior behavior)
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
if (behavior != null)
{
this.behavior = behavior;
}
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final boolean isCompleted()
@@ -383,27 +387,17 @@
public final boolean isClosed()
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
return this.terminated;
}
- finally
- {
- this.objectLock.unlock();
- }
}
public final boolean isDiscarded()
{
- this.objectLock.lock();
- try
+ synchronized (lock)
{
return this.discarded;
}
- finally
- {
- this.objectLock.unlock();
- }
}
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/RMSequenceManager.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -21,8 +21,10 @@
*/
package org.jboss.ws.extensions.wsrm;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -93,9 +95,20 @@
{
while (this.destroyed == false)
{
- for (int i = 0; i < sequences.size(); i++)
+ for (RMSequenceImpl sequence : sequences)
{
- logger.debug("processing sequence " +
sequences.get(i).getOutboundId());
+ logger.debug("Processing outbound sequence " +
sequence.getOutboundId());
+ if (sequence.isAckRequested())
+ {
+ /*
+ logger.debug("Sending ack for inbound sequence " +
sequence.getInboundId());
+ Map<String, Object> wsrmReqCtx = new HashMap<String,
Object>();
+ wsrmReqCtx.put(RMConstant.ONE_WAY_OPERATION, true);
+
sequence.getBindingProvider().getRequestContext().put(RMConstant.REQUEST_CONTEXT,
wsrmReqCtx);
+ sequence.sendSequenceAcknowledgementMessage();
+ sequence.ackRequested(false);
+ */
+ }
}
try
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/api/RMProvider.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/api/RMProvider.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/api/RMProvider.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -36,5 +36,5 @@
* @return created sequence
* @throws RMException
*/
- RMSequence createSequence(RMAddressingType addrType, RMSequenceType seqType) throws
RMException;
+ RMSequence createSequence(RMAddressingType addrType);
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -95,7 +95,7 @@
{
for (long i = range.getLower(); i <= range.getUpper(); i++)
{
- sequence.addReceivedInboundMessage(i);
+ sequence.addReceivedMessage(i);
}
}
if (seqAckHeader.isFinal())
@@ -119,7 +119,7 @@
throw new RMException("Expected inbound sequenceId:" +
sequence.getInboundId() + " , but was: " + inboundSeqId);
}
- sequence.ackRequested();
+ sequence.ackRequested(true);
}
public static void handleSequenceHeader(RMSequence seqHeader, RMSequenceImpl
sequence)
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannel.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -93,12 +93,12 @@
return rmMessage;
}
- private MessageAbstraction createResponse(RMMessage rmResponse, RMMetadata rmMetadata)
throws Throwable
+ private MessageAbstraction createResponse(RMMessage rmRequest, RMMessage rmResponse,
RMMetadata rmMetadata) throws Throwable
{
Map<String, Object> invocationContext =
rmMetadata.getContext(INVOCATION_CONTEXT);
- boolean oneWay =
(Boolean)rmMetadata.getContext(INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
+ boolean oneWay = RMTransportHelper.isOneWayOperation(rmRequest);
MessageAbstraction response = null;
- //if (!oneWay)
+ if (!oneWay)
{
byte[] payload = rmResponse.getPayload();
InputStream is = payload == null ? null : new
ByteArrayInputStream(rmResponse.getPayload());
@@ -117,7 +117,7 @@
{
RMMessage rmRequest = createRMMessage(request, rmMetadata);
RMMessage rmResponse = sendToChannel(rmRequest);
- return createResponse(rmResponse, rmMetadata);
+ return createResponse(rmRequest, rmResponse, rmMetadata);
}
private RMMessage sendToChannel(RMMessage request) throws Throwable
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMChannelRequest.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -83,7 +83,7 @@
callbackHandler.addUnassignedMessageListener(sequence);
}
}
- boolean oneWay = /*RMTransportHelper.isOneWayOperation(rmRequest) &&
*/(backPort != null); // TODO: backport support
+ boolean oneWay = (backPort != null); // TODO: backport support
Client client = new Client(locator, JBOSSWS_SUBSYSTEM,
rmRequest.getMetadata().getContext(REMOTING_CONFIGURATION_CONTEXT));
client.connect();
@@ -119,7 +119,7 @@
if (backPort != null) // TODO: backport support
{
- if (messageId != null)
+ if ((null != messageId) && (false ==
RMTransportHelper.isOneWayOperation(rmRequest)))
{
// register callbacks only for outbound messages with messageId
return new RMChannelResponse(callbackHandler, messageId);
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMMetadata.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -12,7 +12,6 @@
public RMMetadata(
String targetAddress,
- boolean oneWay,
Marshaller marshaller,
UnMarshaller unmarshaller,
Map<String, Object> invocationContext,
@@ -23,7 +22,6 @@
throw new IllegalArgumentException("Target address cannot be null");
invocationContext.put(RMConstant.TARGET_ADDRESS, targetAddress);
- invocationContext.put(RMConstant.ONE_WAY_OPERATION, oneWay);
setContext(RMConstant.INVOCATION_CONTEXT, invocationContext);
if (marshaller == null || unmarshaller == null)
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/transport/RMTransportHelper.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -5,11 +5,15 @@
import java.net.URI;
import java.util.Map;
+import org.jboss.logging.Logger;
import org.jboss.ws.extensions.wsrm.RMConstant;
import org.jboss.ws.extensions.wsrm.RMSequenceImpl;
public final class RMTransportHelper
{
+
+ private static Logger logger = Logger.getLogger(RMTransportHelper.class);
+
private RMTransportHelper()
{
// no instances
@@ -43,7 +47,15 @@
public static boolean isOneWayOperation(RMMessage rmRequest)
{
- return
(Boolean)rmRequest.getMetadata().getContext(RMConstant.INVOCATION_CONTEXT).get(ONE_WAY_OPERATION);
+ RMMetadata meta = rmRequest.getMetadata();
+ if (meta == null) throw new RuntimeException("Unable to obtain wsrm
metadata");
+ Map<String, Object> invCtx = meta.getContext(RMConstant.INVOCATION_CONTEXT);
+ if (invCtx == null) throw new RuntimeException("Unable to obtain invocation
context");
+ Map<String, Object> wsrmReqCtx = (Map<String,
Object>)invCtx.get(RMConstant.REQUEST_CONTEXT);
+ Boolean isOneWay = (Boolean)wsrmReqCtx.get(ONE_WAY_OPERATION);
+ logger.debug("oneWayMessage == " + (isOneWay == null ? false :
isOneWay.booleanValue()));
+ logger.debug("messages == " + wsrmReqCtx.get(PROTOCOL_MESSAGES));
+ return isOneWay == null ? false : isOneWay.booleanValue();
}
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/metadata/builder/jaxws/JAXWSClientMetaDataBuilder.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/metadata/builder/jaxws/JAXWSClientMetaDataBuilder.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/main/java/org/jboss/ws/metadata/builder/jaxws/JAXWSClientMetaDataBuilder.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -124,6 +124,12 @@
createSequenceMD.setOneWay(false);
endpointMD.addOperation(createSequenceMD);
+ // register sequenceAcknowledgement method
+ QName sequenceAcknowledgementQName =
rmProvider.getConstants().getSequenceAcknowledgementQName();
+ OperationMetaData sequenceAcknowledgementMD = new OperationMetaData(endpointMD,
sequenceAcknowledgementQName, "sequenceAcknowledgement");
+ sequenceAcknowledgementMD.setOneWay(true);
+ endpointMD.addOperation(sequenceAcknowledgementMD);
+
// register closeSequence method
QName closeSequenceQName = rmProvider.getConstants().getCloseSequenceQName();
OperationMetaData closeSequenceMD = new OperationMetaData(endpointMD,
closeSequenceQName, "closeSequence");
Modified:
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java
===================================================================
---
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/test/java/org/jboss/test/ws/jaxws/wsrm/emulator/EndpointEmulator.java 2007-11-30
21:50:09 UTC (rev 5161)
@@ -100,7 +100,11 @@
ctx.log(configFile + SEPARATOR + view.getId());
Map<String, String> resProperties = view.getResponse().getProperties();
Map<String, String> reqProperties = view.getRequest().getProperties();
- String responseMessage = Util.getResourceAsString(ctx,
view.getResponse().getResource());
+ String responseMessage = null;
+ if (view.getResponse().getResource() != null)
+ {
+ responseMessage = Util.getResourceAsString(ctx,
view.getResponse().getResource());
+ }
String responseTo = null;
if (resProperties.size() > 0)
@@ -112,7 +116,10 @@
}
Map<String, String> replaceMap =
Util.prepareReplaceMap(initializedReqProperties, resProperties);
- responseMessage = Util.replaceAll(responseMessage, replaceMap);
+ if (responseMessage != null)
+ {
+ responseMessage = Util.replaceAll(responseMessage, replaceMap);
+ }
responseTo = replaceMap.get(RESPONSE_TO);
if (ADDRESSING_ANONYMOUS_URI.equals(responseTo))
{
@@ -123,23 +130,32 @@
if (responseTo == null)
{
ctx.log("Sending response through ServletResponse");
- resp.setContentType(view.getResponse().getContentType());
+ if (view.getResponse().getContentType() != null)
+ {
+ resp.setContentType(view.getResponse().getContentType());
+ }
resp.setStatus(Integer.valueOf(view.getResponse().getStatusCode()));
- PrintWriter writer = resp.getWriter();
- writer.print(responseMessage);
- writer.flush();
- writer.close();
+ if (responseMessage != null)
+ {
+ PrintWriter writer = resp.getWriter();
+ writer.print(responseMessage);
+ writer.flush();
+ writer.close();
+ }
}
else
{
- ctx.log("Sending response through new socket connection");
- URL url = new URL(responseTo);
- Socket socket = new Socket(url.getHost(), url.getPort());
- OutputStream out = socket.getOutputStream();
- out.write(Util.createHTTPHeaders(url, responseMessage.length(),
view.getResponse().getContentType()));
- out.write(responseMessage.getBytes());
- out.flush();
- out.close();
+ if (responseMessage != null)
+ {
+ ctx.log("Sending response through new socket connection");
+ URL url = new URL(responseTo);
+ Socket socket = new Socket(url.getHost(), url.getPort());
+ OutputStream out = socket.getOutputStream();
+ out.write(Util.createHTTPHeaders(url, responseMessage.length(),
view.getResponse().getContentType()));
+ out.write(responseMessage.getBytes());
+ out.flush();
+ out.close();
+ }
}
}
Modified:
stack/native/trunk/src/test/resources/jaxws/wsrm/emulator/reqres/config/req-res-service-config.xml
===================================================================
---
stack/native/trunk/src/test/resources/jaxws/wsrm/emulator/reqres/config/req-res-service-config.xml 2007-11-30
17:28:08 UTC (rev 5160)
+++
stack/native/trunk/src/test/resources/jaxws/wsrm/emulator/reqres/config/req-res-service-config.xml 2007-11-30
21:50:09 UTC (rev 5161)
@@ -29,7 +29,7 @@
</response>
</view>
- <view id="sequence.acknowledgement">
+ <view id="server.sequence.acknowledgement">
<request httpMethod="POST" pathInfo="/ReqResService">
<contains>
<node
name="|{${soap}}Envelope|{${soap}}Header|{${wsrm}}Sequence"/>
@@ -50,6 +50,15 @@
</response>
</view>
+ <view id="client.sequence.acknowledgement">
+ <request httpMethod="POST" pathInfo="/ReqResService">
+ <contains>
+ <node
name="|{${soap}}Envelope|{${soap}}Header|{${wsrm}}SequenceAcknowledgement"/>
+ </contains>
+ </request>
+ <response statusCode="200"/>
+ </view>
+
<view id="close.sequence">
<request httpMethod="POST" pathInfo="/ReqResService">
<contains>