Author: alessio.soldano(a)jboss.com
Date: 2009-06-29 06:05:41 -0400 (Mon, 29 Jun 2009)
New Revision: 10249
Added:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/HTTPRemotingConnection.java
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java
Log:
Refactoring, new NettyClient
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/HTTPRemotingConnection.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/HTTPRemotingConnection.java 2009-06-29
09:43:28 UTC (rev 10248)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/HTTPRemotingConnection.java 2009-06-29
10:05:41 UTC (rev 10249)
@@ -22,51 +22,19 @@
package org.jboss.ws.core.client;
import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.Executors;
-import javax.xml.rpc.Stub;
import javax.xml.soap.MimeHeader;
import javax.xml.soap.MimeHeaders;
-import javax.xml.ws.BindingProvider;
import javax.xml.ws.addressing.EndpointReference;
import org.jboss.logging.Logger;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
-import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunk;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpMessage;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpVersion;
-import org.jboss.security.Base64Encoder;
-import org.jboss.ws.core.CommonMessageContext;
import org.jboss.ws.core.MessageAbstraction;
import org.jboss.ws.core.MessageTrace;
-import org.jboss.ws.core.StubExt;
-import org.jboss.ws.core.WSTimeoutException;
-import org.jboss.ws.core.soap.MessageContextAssociation;
import org.jboss.ws.extensions.wsrm.transport.RMChannel;
import org.jboss.ws.extensions.wsrm.transport.RMTransportHelper;
-import org.jboss.ws.feature.FastInfosetFeature;
-import org.jboss.ws.metadata.config.CommonConfig;
-import org.jboss.ws.metadata.config.EndpointProperty;
-import org.jboss.ws.metadata.umdm.EndpointMetaData;
/**
* SOAPConnection implementation.
@@ -78,6 +46,7 @@
*
* @author Thomas.Diesler(a)jboss.org
* @author <a href="mailto:jason@stacksmash.com">Jason T.
Greene</a>
+ * @author alessio.soldano(a)jboss.com
*
* @since 02-Feb-2005
*/
@@ -86,7 +55,7 @@
// provide logging
private static Logger log = Logger.getLogger(HTTPRemotingConnection.class);
- private static final int DEFAULT_CHUNK_SIZE = 1024;
+// private static final int DEFAULT_CHUNK_SIZE = 1024;
// private Map<String, Object> clientConfig = new HashMap<String,
Object>();
@@ -116,7 +85,7 @@
// }
private boolean closed;
- private Integer chunkSize = new Integer(DEFAULT_CHUNK_SIZE);
+ private Integer chunkSize;
private static final RMChannel RM_CHANNEL = RMChannel.getInstance();
@@ -163,7 +132,6 @@
if (closed)
throw new IOException("Connection is already closed");
- Long timeout = null;
String targetAddress;
Map<String, Object> callProps = new HashMap<String, Object>();
@@ -172,12 +140,6 @@
EndpointInfo epInfo = (EndpointInfo)endpoint;
targetAddress = epInfo.getTargetAddress();
callProps = epInfo.getProperties();
-
- if (callProps.containsKey(StubExt.PROPERTY_CLIENT_TIMEOUT))
- {
- timeout = new
Long(callProps.get(StubExt.PROPERTY_CLIENT_TIMEOUT).toString());
- }
-
}
else if (endpoint instanceof EndpointReference)
{
@@ -189,285 +151,42 @@
targetAddress = endpoint.toString();
}
- //Netty client
- UnMarshaller unmarshaller = getUnmarshaller();
-
- ChannelFactory factory = new
NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
- // ChannelFactory factory = new NioClientSocketChannelFactory(clientExecutor,
clientExecutor);
-
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
- WSClientPipelineFactory channelPipelineFactory = new WSClientPipelineFactory();
- WSResponseHandler responseHandler = null;
- if (!oneway || maintainSession)
- {
- responseHandler = new WSResponseHandler(unmarshaller);
- channelPipelineFactory.setResponseHandler(responseHandler);
- }
- bootstrap.setPipelineFactory(channelPipelineFactory);
-
if (RMTransportHelper.isRMMessage(callProps))
{
- // try
- // {
- // RMMetadata rmMetadata = new RMMetadata(null, targetAddress,
marshaller, unmarshaller, callProps, metadata, null); //TODO!! remoting version, client
config, etc.
- // return RM_CHANNEL.send(reqMessage, rmMetadata);
- // }
- // catch (Throwable t)
- // {
- // IOException io = new IOException();
- // io.initCause(t);
- // throw io;
- // }
+// try
+// {
+// RMMetadata rmMetadata = new RMMetadata(null, targetAddress,
marshaller, unmarshaller, callProps, metadata, null); //TODO!! remoting version, client
config, etc.
+// return RM_CHANNEL.send(reqMessage, rmMetadata);
+// }
+// catch (Throwable t)
+// {
+// IOException io = new IOException();
+// io.initCause(t);
+// throw io;
+// }
return null; //TODO!!!
}
else
{
- Channel channel = null;
- try
+ NettyClient client = new NettyClient(getMarshaller(), getUnmarshaller());
+ if (chunkSize != null)
{
- //Start the connection attempt
- URL target;
- try
- {
- target = new URL(targetAddress);
- }
- catch (MalformedURLException e)
- {
- throw new RuntimeException("Invalid address: " + targetAddress,
e);
- }
- ChannelFuture future = bootstrap.connect(getSocketAddress(target));
-
- //Wait until the connection attempt succeeds or fails
- awaitUninterruptibly(future, timeout);
- if (!future.isSuccess())
- {
- IOException io = new IOException("Could not connect to " +
target.getHost());
- io.initCause(future.getCause());
- factory.releaseExternalResources();
- throw io;
- }
- channel = future.getChannel();
-
- //Trace the outgoing message
- MessageTrace.traceMessage("Outgoing Request Message", reqMessage);
-
- //Send the HTTP request
- HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, reqMessage
!= null ? HttpMethod.POST : HttpMethod.GET, targetAddress);
- request.addHeader(HttpHeaders.Names.HOST, target.getHost());
- request.addHeader(HttpHeaders.Names.CONNECTION,
HttpHeaders.Values.KEEP_ALIVE);
- Map<String, Object> additionalHeaders = new HashMap<String,
Object>();
- populateHeaders(reqMessage, additionalHeaders);
- setAdditionalHeaders(request, additionalHeaders);
- setActualChunkedLength(request);
- setAuthorization(request, callProps);
-
- writeRequest(channel, request, reqMessage);
-
- if (oneway && !maintainSession)
- {
- //No need to wait for the connection to be closed
- return null;
- }
- //Wait for the server to close the connection
- ChannelFuture closeFuture = channel.getCloseFuture();
- awaitUninterruptibly(closeFuture, timeout);
- if (responseHandler.getError() != null)
- {
- throw responseHandler.getError();
- }
- MessageAbstraction resMessage = null;
- Map<String, Object> resHeaders = null;
- if (!oneway)
- {
- //Get the response
- resMessage = responseHandler.getResponseMessage();
- resHeaders = responseHandler.getResponseHeaders();
- }
- //Update props with response headers (required to maintain session using
cookies)
- callProps.clear();
- if (resHeaders != null)
- {
- callProps.putAll(resHeaders);
- }
-
- //Trace the incoming response message
- MessageTrace.traceMessage("Incoming Response Message",
resMessage);
- return resMessage;
+ client.setChunkSize(chunkSize);
}
- catch (IOException ioe)
- {
- throw ioe;
- }
- catch (WSTimeoutException toe)
- {
- throw toe;
- }
- catch (Throwable t)
- {
- IOException io = new IOException("Could not transmit message");
- io.initCause(t);
- throw io;
- }
- finally
- {
- if (channel != null)
- {
- channel.close();
- }
- //Shut down executor threads to exit
- factory.releaseExternalResources();
- }
+
+ Map<String, Object> additionalHeaders = new HashMap<String,
Object>();
+ populateHeaders(reqMessage, additionalHeaders);
+ //Trace the outgoing message
+ MessageTrace.traceMessage("Outgoing Request Message", reqMessage);
+ MessageAbstraction resMessage = (MessageAbstraction)client.invoke(reqMessage,
targetAddress, !oneway || maintainSession, additionalHeaders, callProps);
+ //Trace the incoming response message
+ MessageTrace.traceMessage("Incoming Response Message", resMessage);
+ return resMessage;
}
}
- private InetSocketAddress getSocketAddress(URL target)
- {
- int port = target.getPort();
- if (port < 0)
- {
- //use default port
- String protocol = target.getProtocol();
- if ("http".equalsIgnoreCase(protocol))
- {
- port = 80;
- }
- else if ("https".equalsIgnoreCase(protocol))
- {
- port = 443;
- }
- }
- return new InetSocketAddress(target.getHost(), port);
- }
- private void writeRequest(Channel channel, HttpRequest request, MessageAbstraction
reqMessage) throws IOException
- {
- if (reqMessage == null)
- {
- channel.write(request);
- }
- else
- {
- ChannelBuffer content = ChannelBuffers.dynamicBuffer();
- OutputStream os = new ChannelBufferOutputStream(content);
- getMarshaller().write(reqMessage, os);
-
- int cs = chunkSize;
- if (cs > 0) //chunked encoding
- {
- request.addHeader(HttpHeaders.Names.TRANSFER_ENCODING,
HttpHeaders.Values.CHUNKED);
- //write headers
- channel.write(request);
- //write content chunks
- int size = content.readableBytes();
- int cur = 0;
- while (cur < size)
- {
- int to = Math.min(cur + cs, size);
- HttpChunk chunk = new DefaultHttpChunk(content.slice(cur, to - cur));
- channel.write(chunk);
- cur = to;
- }
- //write last chunk
- channel.write(HttpChunk.LAST_CHUNK);
- }
- else
- {
- request.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes()));
- request.setContent(content);
- channel.write(request);
- }
- }
- }
- /**
- * Utility method for awaiting with or without timeout (timeout == null or <=0
implies not timeout)
- *
- * @param future
- * @param timeout
- * @throws WSTimeoutException
- */
- private static void awaitUninterruptibly(ChannelFuture future, Long timeout) throws
WSTimeoutException
- {
- if (timeout != null && timeout.longValue() > 0)
- {
- boolean bool = future.awaitUninterruptibly(timeout);
- if (!bool)
- {
- throw new WSTimeoutException("Timeout after: " + timeout +
"ms", timeout);
- }
- }
- else
- {
- future.awaitUninterruptibly();
- }
- }
-
- protected void setActualChunkedLength(HttpRequest message)
- {
- if (HttpMethod.POST.equals(message.getMethod()))
- {
- CommonMessageContext msgContext =
MessageContextAssociation.peekMessageContext();
- //We always use chunked transfer encoding
-// int chunkSizeValue = (chunkSize != null ? chunkSize : 1024);
- // Overwrite, through endpoint config
- if (msgContext != null)
- {
- EndpointMetaData epMetaData = msgContext.getEndpointMetaData();
- CommonConfig config = epMetaData.getConfig();
-
- String sizeValue =
config.getProperty(EndpointProperty.CHUNKED_ENCODING_SIZE);
- if (sizeValue != null)
- chunkSize = Integer.valueOf(sizeValue);
- if (epMetaData.isFeatureEnabled(FastInfosetFeature.class))
- chunkSize = 0;
- }
- }
- }
-
- protected void setAuthorization(HttpMessage message, Map callProps) throws
IOException
- {
- //Get authentication type, default to BASIC authetication
- String authType = (String)callProps.get(StubExt.PROPERTY_AUTH_TYPE);
- if (authType == null)
- authType = StubExt.PROPERTY_AUTH_TYPE_BASIC;
- String username = (String)callProps.get(Stub.USERNAME_PROPERTY);
- String password = (String)callProps.get(Stub.PASSWORD_PROPERTY);
- if (username == null || password == null)
- {
- username = (String)callProps.get(BindingProvider.USERNAME_PROPERTY);
- password = (String)callProps.get(BindingProvider.PASSWORD_PROPERTY);
- }
- if (username != null && password != null)
- {
- if (authType.equals(StubExt.PROPERTY_AUTH_TYPE_BASIC))
- {
- message.addHeader(HttpHeaders.Names.AUTHORIZATION,
getBasicAuthHeader(username, password));
- }
- }
- }
-
- private static String getBasicAuthHeader(String username, String password) throws
IOException
- {
- return "Basic " + Base64Encoder.encode(username + ":" +
password);
- }
-
- protected void setAdditionalHeaders(HttpMessage message, Map<String, Object>
headers)
- {
- for (String key : headers.keySet())
- {
- try
- {
- String header = (String)headers.get(key);
- message.addHeader(key, header.replaceAll("[\r\n\f]", "
"));
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- }
-
protected void populateHeaders(MessageAbstraction reqMessage, Map<String,
Object> metadata)
{
if (reqMessage != null)
Added:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
(rev 0)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java 2009-06-29
10:05:41 UTC (rev 10249)
@@ -0,0 +1,442 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+import javax.xml.rpc.Stub;
+import javax.xml.ws.BindingProvider;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMessage;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.security.Base64Encoder;
+import org.jboss.ws.core.CommonMessageContext;
+import org.jboss.ws.core.StubExt;
+import org.jboss.ws.core.WSTimeoutException;
+import org.jboss.ws.core.soap.MessageContextAssociation;
+import org.jboss.ws.feature.FastInfosetFeature;
+import org.jboss.ws.metadata.config.CommonConfig;
+import org.jboss.ws.metadata.config.EndpointProperty;
+import org.jboss.ws.metadata.umdm.EndpointMetaData;
+
+/**
+ * A http client using Netty
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 29-Jun-2009
+ *
+ */
+public class NettyClient
+{
+ private Marshaller marshaller;
+ private UnMarshaller unmarshaller;
+ private Long timeout;
+ private static final int DEFAULT_CHUNK_SIZE = 1024;
+ //We always use chunked transfer encoding unless explicitely disabled by user
+ private Integer chunkSize = new Integer(DEFAULT_CHUNK_SIZE);
+ private Executor bossExecutor;
+ private Executor workerExecutor;
+
+ /**
+ * Construct a Netty client with the provided marshaller/unmarshaller.
+ *
+ * @param marshaller
+ * @param unmarshaller
+ */
+ public NettyClient(Marshaller marshaller, UnMarshaller unmarshaller)
+ {
+ this.marshaller = marshaller;
+ this.unmarshaller = unmarshaller;
+ this.bossExecutor = Executors.newCachedThreadPool();
+ this.workerExecutor = Executors.newCachedThreadPool();
+ }
+
+ /**
+ * Construct a Netty client with the provided marshaller/unmarshaller and executors.
+ *
+ * @param marshaller
+ * @param unmarshaller
+ * @param bossExecutor
+ * @param workerExecutor
+ */
+ public NettyClient(Marshaller marshaller, UnMarshaller unmarshaller, Executor
bossExecutor, Executor workerExecutor)
+ {
+ this.marshaller = marshaller;
+ this.unmarshaller = unmarshaller;
+ this.bossExecutor = bossExecutor;
+ this.workerExecutor = workerExecutor;
+ }
+
+ /**
+ * Performs the invocation; a HTTP GET is performed when the reqMessage is null,
otherwise a HTTP POST is performed.
+ *
+ * @param reqMessage The request message
+ * @param targetAddress The target address
+ * @param waitForResponse A boolean saying if the method should wait for the
results before returning. Waiting is required for two-ways invocations
+ * and when maintaining sessions using cookies.
+ * @param additionalHeaders Additional http headers to be added to the request
+ * @param callProps
+ * @return
+ * @throws IOException
+ */
+ public Object invoke(Object reqMessage, String targetAddress, boolean waitForResponse,
Map<String, Object> additionalHeaders, Map<String, Object> callProps)
+ throws IOException
+ {
+ ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor,
workerExecutor);
+
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+ WSClientPipelineFactory channelPipelineFactory = new WSClientPipelineFactory();
+ WSResponseHandler responseHandler = null;
+ if (waitForResponse)
+ {
+ responseHandler = new WSResponseHandler(unmarshaller);
+ channelPipelineFactory.setResponseHandler(responseHandler);
+ }
+ bootstrap.setPipelineFactory(channelPipelineFactory);
+
+ Channel channel = null;
+ try
+ {
+ setActualTimeout(callProps);
+ //Start the connection attempt
+ URL target;
+ try
+ {
+ target = new URL(targetAddress);
+ }
+ catch (MalformedURLException e)
+ {
+ throw new RuntimeException("Invalid address: " + targetAddress,
e);
+ }
+ ChannelFuture future = bootstrap.connect(getSocketAddress(target));
+
+ //Wait until the connection attempt succeeds or fails
+ awaitUninterruptibly(future, timeout);
+ if (!future.isSuccess())
+ {
+ IOException io = new IOException("Could not connect to " +
target.getHost());
+ io.initCause(future.getCause());
+ factory.releaseExternalResources();
+ throw io;
+ }
+ channel = future.getChannel();
+
+ //Send the HTTP request
+ HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, reqMessage !=
null ? HttpMethod.POST : HttpMethod.GET, targetAddress);
+ request.addHeader(HttpHeaders.Names.HOST, target.getHost());
+ request.addHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ setAdditionalHeaders(request, additionalHeaders);
+ setActualChunkedLength(request);
+ setAuthorization(request, callProps);
+
+ writeRequest(channel, request, reqMessage);
+
+ if (!waitForResponse)
+ {
+ //No need to wait for the connection to be closed
+ return null;
+ }
+ //Wait for the server to close the connection
+ ChannelFuture closeFuture = channel.getCloseFuture();
+ awaitUninterruptibly(closeFuture, timeout);
+ if (responseHandler.getError() != null)
+ {
+ throw responseHandler.getError();
+ }
+ Object resMessage = null;
+ Map<String, Object> resHeaders = null;
+ //Get the response
+ resMessage = responseHandler.getResponseMessage();
+ resHeaders = responseHandler.getResponseHeaders();
+ //Update props with response headers (required to maintain session using
cookies)
+ callProps.clear();
+ if (resHeaders != null)
+ {
+ callProps.putAll(resHeaders);
+ }
+
+ return resMessage;
+ }
+ catch (IOException ioe)
+ {
+ throw ioe;
+ }
+ catch (WSTimeoutException toe)
+ {
+ throw toe;
+ }
+ catch (Throwable t)
+ {
+ IOException io = new IOException("Could not transmit message");
+ io.initCause(t);
+ throw io;
+ }
+ finally
+ {
+ if (channel != null)
+ {
+ channel.close();
+ }
+ //Shut down executor threads to exit
+ factory.releaseExternalResources();
+ }
+ }
+
+ private InetSocketAddress getSocketAddress(URL target)
+ {
+ int port = target.getPort();
+ if (port < 0)
+ {
+ //use default port
+ String protocol = target.getProtocol();
+ if ("http".equalsIgnoreCase(protocol))
+ {
+ port = 80;
+ }
+ else if ("https".equalsIgnoreCase(protocol))
+ {
+ port = 443;
+ }
+ }
+ return new InetSocketAddress(target.getHost(), port);
+ }
+
+ private void writeRequest(Channel channel, HttpRequest request, Object reqMessage)
throws IOException
+ {
+ if (reqMessage == null)
+ {
+ channel.write(request);
+ }
+ else
+ {
+ ChannelBuffer content = ChannelBuffers.dynamicBuffer();
+ OutputStream os = new ChannelBufferOutputStream(content);
+ marshaller.write(reqMessage, os);
+
+ int cs = chunkSize;
+ if (cs > 0) //chunked encoding
+ {
+ os.flush();
+ request.addHeader(HttpHeaders.Names.TRANSFER_ENCODING,
HttpHeaders.Values.CHUNKED);
+ //write headers
+ channel.write(request);
+ //write content chunks
+ int size = content.readableBytes();
+ int cur = 0;
+ while (cur < size)
+ {
+ int to = Math.min(cur + cs, size);
+ HttpChunk chunk = new DefaultHttpChunk(content.slice(cur, to - cur));
+ channel.write(chunk);
+ cur = to;
+ }
+ //write last chunk
+ channel.write(HttpChunk.LAST_CHUNK);
+ }
+ else
+ {
+ request.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes()));
+ request.setContent(content);
+ channel.write(request);
+ }
+ }
+ }
+
+ /**
+ * Utility method for awaiting with or without timeout (timeout == null or <=0
implies not timeout)
+ *
+ * @param future
+ * @param timeout
+ * @throws WSTimeoutException
+ */
+ private static void awaitUninterruptibly(ChannelFuture future, Long timeout) throws
WSTimeoutException
+ {
+ if (timeout != null && timeout.longValue() > 0)
+ {
+ boolean bool = future.awaitUninterruptibly(timeout);
+ if (!bool)
+ {
+ throw new WSTimeoutException("Timeout after: " + timeout +
"ms", timeout);
+ }
+ }
+ else
+ {
+ future.awaitUninterruptibly();
+ }
+ }
+
+ /**
+ * Set the actual chunk size according to the endpoint config overwrite and/or
configured features.
+ *
+ * @param message
+ */
+ protected void setActualChunkedLength(HttpRequest message)
+ {
+ if (HttpMethod.POST.equals(message.getMethod()))
+ {
+ CommonMessageContext msgContext =
MessageContextAssociation.peekMessageContext();
+ //Overwrite, through endpoint config
+ if (msgContext != null)
+ {
+ EndpointMetaData epMetaData = msgContext.getEndpointMetaData();
+ CommonConfig config = epMetaData.getConfig();
+
+ String sizeValue =
config.getProperty(EndpointProperty.CHUNKED_ENCODING_SIZE);
+ if (sizeValue != null)
+ chunkSize = Integer.valueOf(sizeValue);
+ if (epMetaData.isFeatureEnabled(FastInfosetFeature.class))
+ chunkSize = 0;
+ }
+ }
+ }
+
+ /**
+ * Set the actual timeout according to specified caller properties
+ *
+ * @param callProps
+ */
+ protected void setActualTimeout(Map<String, Object> callProps)
+ {
+ if (callProps.containsKey(StubExt.PROPERTY_CLIENT_TIMEOUT))
+ {
+ timeout = new Long(callProps.get(StubExt.PROPERTY_CLIENT_TIMEOUT).toString());
+ }
+ }
+
+ /**
+ * Set the required headers in the Netty's HttpMessage to allow for proper
authorization.
+ *
+ * @param message
+ * @param callProps
+ * @throws IOException
+ */
+ protected void setAuthorization(HttpMessage message, Map<String, Object>
callProps) throws IOException
+ {
+ //Get authentication type, default to BASIC authetication
+ String authType = (String)callProps.get(StubExt.PROPERTY_AUTH_TYPE);
+ if (authType == null)
+ authType = StubExt.PROPERTY_AUTH_TYPE_BASIC;
+ String username = (String)callProps.get(Stub.USERNAME_PROPERTY);
+ String password = (String)callProps.get(Stub.PASSWORD_PROPERTY);
+ if (username == null || password == null)
+ {
+ username = (String)callProps.get(BindingProvider.USERNAME_PROPERTY);
+ password = (String)callProps.get(BindingProvider.PASSWORD_PROPERTY);
+ }
+ if (username != null && password != null)
+ {
+ if (authType.equals(StubExt.PROPERTY_AUTH_TYPE_BASIC))
+ {
+ message.addHeader(HttpHeaders.Names.AUTHORIZATION,
getBasicAuthHeader(username, password));
+ }
+ }
+ }
+
+ private static String getBasicAuthHeader(String username, String password) throws
IOException
+ {
+ return "Basic " + Base64Encoder.encode(username + ":" +
password);
+ }
+
+ /**
+ * Copy the provided additional headers to the Netty's HttpMessage.
+ *
+ * @param message
+ * @param headers
+ */
+ protected void setAdditionalHeaders(HttpMessage message, Map<String, Object>
headers)
+ {
+ for (String key : headers.keySet())
+ {
+ try
+ {
+ String header = (String)headers.get(key);
+ message.addHeader(key, header.replaceAll("[\r\n\f]", "
"));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Set the Netty boss executor
+ *
+ * @param bossExecutor
+ */
+ public void setBossExecutor(Executor bossExecutor)
+ {
+ this.bossExecutor = bossExecutor;
+ }
+
+ /**
+ * Set the Netty worker executor
+ *
+ * @param workerExecutor
+ */
+ public void setWorkerExecutor(Executor workerExecutor)
+ {
+ this.workerExecutor = workerExecutor;
+ }
+
+ /**
+ *
+ * @return The current chunk size
+ */
+ public Integer getChunkSize()
+ {
+ return chunkSize;
+ }
+
+ /**
+ * Set the chunk size for chunked transfer encoding.
+ * The default chunk size is 1024 bytes.
+ *
+ * @param chunkSize
+ */
+ public void setChunkSize(Integer chunkSize)
+ {
+ this.chunkSize = chunkSize;
+ }
+}
Property changes on:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java 2009-06-29
09:43:28 UTC (rev 10248)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java 2009-06-29
10:05:41 UTC (rev 10249)
@@ -31,7 +31,6 @@
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.ws.core.MessageAbstraction;
/**
* A Netty channel upstream handler that receives MessageEvent
@@ -45,7 +44,7 @@
public class WSResponseHandler extends SimpleChannelUpstreamHandler
{
private UnMarshaller unmarshaller;
- private MessageAbstraction responseMessage;
+ private Object responseMessage;
private Map<String, Object> responseHeaders;
private Throwable error;
@@ -72,7 +71,7 @@
}
ChannelBuffer content = response.getContent();
- this.responseMessage = (MessageAbstraction)unmarshaller.read(content.readable()
? new ChannelBufferInputStream(content) : null, responseHeaders);
+ this.responseMessage = unmarshaller.read(content.readable() ? new
ChannelBufferInputStream(content) : null, responseHeaders);
}
catch (Throwable t)
{
@@ -91,7 +90,7 @@
this.responseHeaders = new HashMap<String, Object>();
}
- public MessageAbstraction getResponseMessage()
+ public Object getResponseMessage()
{
return this.responseMessage;
}