Author: alessio.soldano(a)jboss.com
Date: 2009-08-07 04:31:48 -0400 (Fri, 07 Aug 2009)
New Revision: 10498
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyTransportOutputStream.java
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java
Log:
[JBWS-2554] Improving message marshalling process to save memory (do not buffer the whole
message in memory before starting sending when using chunked encoding)
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java 2009-08-06
13:30:09 UTC (rev 10497)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyClient.java 2009-08-07
08:31:48 UTC (rev 10498)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
@@ -26,6 +26,7 @@
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
+import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -43,9 +44,7 @@
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMessage;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -186,12 +185,23 @@
setActualChunkedLength(request, callProps);
setAuthorization(request, callProps);
- ChannelFuture writeFuture = writeRequest(channel, request, reqMessage);
+ ChannelFuture writeFuture = null;
+ try
+ {
+ writeFuture = writeRequest(channel, request, reqMessage);
+ }
+ catch (ClosedChannelException cce)
+ {
+ log.debug("Channel closed by remote peer while sending message");
+ }
if (!waitForResponse)
{
//No need to wait for the connection to be closed, just wait for the write to
be completed.
- writeFuture.awaitUninterruptibly();
+ if (writeFuture != null)
+ {
+ writeFuture.awaitUninterruptibly();
+ }
return null;
}
//Wait for the server to close the connection
@@ -267,32 +277,25 @@
}
else
{
- ChannelBuffer content = ChannelBuffers.dynamicBuffer();
- OutputStream os = new ChannelBufferOutputStream(content);
- marshaller.write(reqMessage, os);
-
int cs = chunkSize;
if (cs > 0) //chunked encoding
{
- os.flush();
request.addHeader(HttpHeaders.Names.TRANSFER_ENCODING,
HttpHeaders.Values.CHUNKED);
//write headers
channel.write(request);
//write content chunks
- int size = content.readableBytes();
- int cur = 0;
- while (cur < size)
- {
- int to = Math.min(cur + cs, size);
- HttpChunk chunk = new DefaultHttpChunk(content.slice(cur, to - cur));
- channel.write(chunk);
- cur = to;
- }
- //write last chunk
- return channel.write(HttpChunk.LAST_CHUNK);
+ NettyTransportOutputStream os = new NettyTransportOutputStream(channel, cs);
+ marshaller.write(reqMessage, os);
+ os.flush();
+ os.close();
+ return os.getChannelFuture();
}
else
{
+ ChannelBuffer content = ChannelBuffers.dynamicBuffer();
+ OutputStream os = new ChannelBufferOutputStream(content);
+ marshaller.write(reqMessage, os);
+ os.flush();
request.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(content.readableBytes()));
request.setContent(content);
return channel.write(request);
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyTransportOutputStream.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyTransportOutputStream.java
(rev 0)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyTransportOutputStream.java 2009-08-07
08:31:48 UTC (rev 10498)
@@ -0,0 +1,160 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+
+/**
+ * An output stream that sends messages using Netty.
+ * This basically adapts the output stream interface
+ * to the Netty API, to allow for marshalling and sending
+ * soap messages in one step only saving memory
+ * (especially when dealing with attachments)
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 06-Aug-2009
+ *
+ */
+public class NettyTransportOutputStream extends OutputStream
+{
+ private Channel channel;
+ private byte[] buffer;
+ private int cur;
+ private ChannelFuture future;
+ private boolean closed = false;
+
+ /**
+ * Constructor
+ *
+ * @param channel The Netty channel to send the message on
+ * @param chunkSize The chunk size (bytes) for chunked encoding (must be > 0)
+ */
+ public NettyTransportOutputStream(Channel channel, int chunkSize)
+ {
+ this.channel = channel;
+ if (chunkSize <= 0)
+ {
+ throw new IllegalArgumentException("Invalid chunk size (must be greater
than 0)");
+ }
+ this.cur = 0;
+ this.buffer = new byte[chunkSize];
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException
+ {
+ this.internalWrite(b);
+ }
+
+ @Override
+ public synchronized void write(byte b[], int off, int len) throws IOException
+ {
+ if (len >= buffer.length)
+ {
+ /* If the request length exceeds the size of the output buffer,
+ flush and then write the data directly using the internalWrite */
+ flushBuffer();
+ for (int i = 0; i < len; i++)
+ {
+ internalWrite(b[off + i]);
+ }
+ return;
+ }
+ if (len > buffer.length - cur)
+ {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buffer, cur, len);
+ cur += len;
+ }
+
+ private void internalWrite(int b) throws IOException
+ {
+ if (cur >= buffer.length)
+ {
+ flushBuffer();
+ }
+ buffer[cur++] = (byte)b;
+ }
+
+ /**
+ * Flush the internal buffer
+ */
+ private void flushBuffer() throws IOException
+ {
+ if (cur > 0)
+ {
+ ChannelBuffer content = ChannelBuffers.copiedBuffer(buffer, 0, cur);
+ HttpChunk chunk = new DefaultHttpChunk(content);
+ if (future != null)
+ {
+ future.awaitUninterruptibly();
+ }
+ future = channel.write(chunk);
+ cur = 0;
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException
+ {
+ //NOOP: we do not flush the buffer as that would mean sending out many messages
with size under the chunkSize
+ }
+
+ /**
+ * Close the stream causing the last chunk to be send on the channel (includes
flushing)
+ */
+ @Override
+ public synchronized void close() throws IOException
+ {
+ flushBuffer();
+ if (future != null)
+ {
+ future.awaitUninterruptibly();
+ }
+ future = channel.write(HttpChunk.LAST_CHUNK);
+ closed = true;
+ }
+
+ /**
+ * Get the Netty channel future for the last message sent.
+ *
+ * @return The Netty channel future for the last message sent.
+ */
+ public synchronized ChannelFuture getChannelFuture()
+ {
+ if (!closed)
+ {
+ throw new IllegalStateException("Cannot get channel future before closing
the stream.");
+ }
+ return future;
+ }
+
+}
Property changes on:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/NettyTransportOutputStream.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java 2009-08-06
13:30:09 UTC (rev 10497)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/WSResponseHandler.java 2009-08-07
08:31:48 UTC (rev 10498)
@@ -28,6 +28,7 @@
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.HttpResponse;
@@ -86,6 +87,15 @@
}
}
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
Exception
+ {
+ if (this.error != null)
+ {
+ this.error = e.getCause();
+ }
+ }
+
private void reset()
{
this.error = null;