Author: remy.maucherat(a)jboss.com
Date: 2013-10-15 13:30:15 -0400 (Tue, 15 Oct 2013)
New Revision: 2279
Modified:
branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
- Refactor the non blocking mode of the NIO2 connector.
- Some debug code remains.
Modified: branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -21,6 +21,7 @@
import static org.jboss.web.CatalinaMessages.MESSAGES;
+import java.io.EOFException;
import java.io.IOException;
import java.util.Iterator;
@@ -545,7 +546,7 @@
if (error) {
Throwable throwable = asyncContext.getError();
if (throwable == null) {
- throwable = new Exception();
+ throwable = new EOFException();
}
if (request.getReadListener() != null) {
request.getReadListener().onError(throwable);
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -41,76 +41,6 @@
public abstract class AbstractInternalOutputBuffer implements OutputBuffer {
/**
- * Associated Coyote response.
- */
- protected Response response;
-
- /**
- * Headers of the associated request.
- */
- protected MimeHeaders headers;
-
- /**
- * Committed flag.
- */
- protected boolean committed;
-
- /**
- * Finished flag.
- */
- protected boolean finished;
-
- /**
- * Pointer to the current write buffer.
- */
- protected byte[] buf;
-
- /**
- * Position in the buffer.
- */
- protected int pos;
-
- /**
- * Underlying output buffer.
- */
- protected OutputBuffer outputBuffer;
-
- /**
- * Filter library. Note: Filter[0] is always the "chunked" filter.
- */
- protected OutputFilter[] filterLibrary;
-
- /**
- * Active filter (which is actually the top of the pipeline).
- */
- protected OutputFilter[] activeFilters;
-
- /**
- * Index of the last active filter.
- */
- protected int lastActiveFilter;
-
- /**
- * Direct byte buffer used for writing.
- */
- protected ByteBuffer bbuf = null;
-
- /**
- * Leftover bytes which could not be written during a non blocking write.
- */
- protected ByteChunk leftover = null;
-
- /**
- * Non blocking mode.
- */
- protected boolean nonBlocking = false;
-
- /**
- * Write timeout
- */
- protected int writeTimeout = -1;
-
- /**
* Create a new instance of {@code AbstractInternalOutputBuffer}
*
* @param response
@@ -118,553 +48,6 @@
*/
public AbstractInternalOutputBuffer(Response response, int headerBufferSize) {
- this.response = response;
- this.headers = response.getMimeHeaders();
- buf = new byte[headerBufferSize];
- if (headerBufferSize < Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE) {
- bbuf = ByteBuffer.allocateDirect(6 * 1500);
- } else {
- bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
- }
-
- outputBuffer = new OutputBufferImpl();
- filterLibrary = new OutputFilter[0];
- activeFilters = new OutputFilter[0];
- lastActiveFilter = -1;
-
- committed = false;
- finished = false;
-
- leftover = new ByteChunk();
- nonBlocking = false;
-
- // Cause loading of HttpMessages
- HttpMessages.getMessage(200);
}
- /**
- * Initialize the internal output buffer
- */
- protected abstract void init();
-
- /**
- * Set the non blocking flag.
- *
- * @param nonBlocking
- */
- public void setNonBlocking(boolean nonBlocking) {
- this.nonBlocking = nonBlocking;
- }
-
- /**
- * Get the non blocking flag value.
- *
- * @return non blocking
- */
- public boolean getNonBlocking() {
- return nonBlocking;
- }
-
- /**
- * Add an output filter to the filter library.
- *
- * @param filter
- */
- public void addFilter(OutputFilter filter) {
-
- OutputFilter[] newFilterLibrary = new OutputFilter[filterLibrary.length + 1];
- for (int i = 0; i < filterLibrary.length; i++) {
- newFilterLibrary[i] = filterLibrary[i];
- }
- newFilterLibrary[filterLibrary.length] = filter;
- filterLibrary = newFilterLibrary;
- activeFilters = new OutputFilter[filterLibrary.length];
- }
-
- /**
- * Get filters.
- *
- * @return the list of filters
- */
- public OutputFilter[] getFilters() {
- return filterLibrary;
- }
-
- /**
- * Clear filters.
- */
- public void clearFilters() {
- filterLibrary = new OutputFilter[0];
- lastActiveFilter = -1;
- }
-
- /**
- * Perform a write operation. The operation may be blocking or non-blocking
- * depending on the value of {@code nonBlocking} flag.
- *
- * @param timeout
- * a timeout for the operation
- * @param unit
- * The time unit of the timeout
- * @return
- */
- protected abstract int write(final long timeout, final TimeUnit unit);
-
- /**
- * Add an output filter to the filter library.
- *
- * @param filter
- */
- public void addActiveFilter(OutputFilter filter) {
-
- if (lastActiveFilter == -1) {
- filter.setBuffer(outputBuffer);
- } else {
- for (int i = 0; i <= lastActiveFilter; i++) {
- if (activeFilters[i] == filter)
- return;
- }
- filter.setBuffer(activeFilters[lastActiveFilter]);
- }
-
- activeFilters[++lastActiveFilter] = filter;
- filter.setResponse(response);
- }
-
- public void removeActiveFilters() {
- // Recycle filters
- for (int i = 0; i <= lastActiveFilter; i++) {
- activeFilters[i].recycle();
- }
- lastActiveFilter = -1;
- }
-
- /**
- * Flush the response.
- *
- * @throws IOException
- * an undelying I/O error occured
- */
- public void flush() throws IOException {
- if (!committed) {
-
- // Send the connector a request for commit. The connector should
- // then validate the headers, send them (using sendHeader) and
- // set the filters accordingly.
- response.action(ActionCode.ACTION_COMMIT, null);
- }
-
- // Flush the current buffer
- flushBuffer();
- }
-
- /**
- *
- */
- protected void clearBuffer() {
- synchronized (this.bbuf) {
- this.bbuf.clear();
- }
- }
-
- /**
- * Recycle this object
- */
- public void recycle() {
- // Recycle Request object
- response.recycle();
- this.clearBuffer();
- pos = 0;
- lastActiveFilter = -1;
- committed = false;
- finished = false;
- }
-
- /**
- * End processing of current HTTP request. Note: All bytes of the current
- * request should have been already consumed. This method only resets all
- * the pointers so that we are ready to parse the next HTTP request.
- */
- public void nextRequest() {
- // Recycle Request object
- response.recycle();
-
- // Recycle filters
- for (int i = 0; i <= lastActiveFilter; i++) {
- activeFilters[i].recycle();
- }
-
- // Reset pointers
- leftover.recycle();
- pos = 0;
- lastActiveFilter = -1;
- committed = false;
- finished = false;
- nonBlocking = false;
- }
-
- /**
- * End request.
- *
- * @throws IOException
- * an undelying I/O error occured
- */
- public void endRequest() throws IOException {
-
- if (!committed) {
- // Send the connector a request for commit. The connector should
- // then validate the headers, send them (using sendHeader) and
- // set the filters accordingly.
- response.action(ActionCode.ACTION_COMMIT, null);
- }
-
- if (finished) {
- return;
- }
-
- if (lastActiveFilter != -1) {
- activeFilters[lastActiveFilter].end();
- }
-
- flushBuffer();
- finished = true;
- }
-
- // ------------------------------------------------ HTTP/1.1 Output Methods
-
- /**
- * Send an acknowledgment.
- *
- * @throws Exception
- */
- public abstract void sendAck() throws Exception;
-
- /**
- * Send the response status line.
- */
- public void sendStatus() {
-
- // Write protocol name
- write(Constants.HTTP_11_BYTES);
- buf[pos++] = Constants.SP;
-
- // Write status code
- int status = response.getStatus();
- switch (status) {
- case 200:
- write(Constants._200_BYTES);
- break;
- case 400:
- write(Constants._400_BYTES);
- break;
- case 404:
- write(Constants._404_BYTES);
- break;
- default:
- write(status);
- }
-
- buf[pos++] = Constants.SP;
-
- // Write message
- String message = null;
- if (org.apache.coyote.Constants.USE_CUSTOM_STATUS_MSG_IN_HEADER) {
- message = response.getMessage();
- }
- if (message == null) {
- write(HttpMessages.getMessage(status));
- } else {
- write(message.replace('\n', ' ').replace('\r', ' '));
- }
-
- // End the response status line
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
- }
-
- /**
- * Send a header.
- *
- * @param name
- * Header name
- * @param value
- * Header value
- */
- public void sendHeader(MessageBytes name, MessageBytes value) {
- if (name.getLength() > 0 && !value.isNull()) {
- write(name);
- buf[pos++] = Constants.COLON;
- buf[pos++] = Constants.SP;
- write(value);
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
- }
- }
-
- /**
- * Send a header.
- *
- * @param name
- * Header name
- * @param value
- * Header value
- */
- public void sendHeader(ByteChunk name, ByteChunk value) {
- write(name);
- buf[pos++] = Constants.COLON;
- buf[pos++] = Constants.SP;
- write(value);
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
- }
-
- /**
- * Send a header.
- *
- * @param name
- * Header name
- * @param value
- * Header value
- */
- public void sendHeader(String name, String value) {
- write(name);
- buf[pos++] = Constants.COLON;
- buf[pos++] = Constants.SP;
- write(value);
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
- }
-
- /**
- * End the header block.
- */
- public void endHeaders() {
- buf[pos++] = Constants.CR;
- buf[pos++] = Constants.LF;
- }
-
- /**
- * Write the contents of a byte chunk.
- *
- * @param chunk
- * byte chunk
- * @return number of bytes written
- * @throws IOException
- * an undelying I/O error occured
- */
- public abstract int doWrite(ByteChunk chunk, Response res) throws IOException;
-
- /**
- * Commit the response.
- *
- * @throws IOException
- * an undelying I/O error occured
- */
- protected void commit() throws IOException {
-
- // The response is now committed
- committed = true;
- response.setCommitted(true);
-
- if (pos > 0) {
- // Sending the response header buffer
- bbuf.put(buf, 0, pos);
- }
- }
-
- /**
- * This method will write the contents of the specyfied message bytes buffer
- * to the output stream, without filtering. This method is meant to be used
- * to write the response header.
- *
- * @param mb
- * data to be written
- */
- protected void write(MessageBytes mb) {
- if (mb == null) {
- return;
- }
-
- switch (mb.getType()) {
- case MessageBytes.T_BYTES:
- write(mb.getByteChunk());
- break;
- case MessageBytes.T_CHARS:
- write(mb.getCharChunk());
- break;
- default:
- write(mb.toString());
- break;
- }
- }
-
- /**
- * This method will write the contents of the specyfied message bytes buffer
- * to the output stream, without filtering. This method is meant to be used
- * to write the response header.
- *
- * @param bc
- * data to be written
- */
- protected void write(ByteChunk bc) {
- // Writing the byte chunk to the output buffer
- int length = bc.getLength();
- System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
- pos = pos + length;
- }
-
- /**
- * This method will write the contents of the specyfied char buffer to the
- * output stream, without filtering. This method is meant to be used to
- * write the response header.
- *
- * @param cc
- * data to be written
- */
- protected void write(CharChunk cc) {
- int start = cc.getStart();
- int end = cc.getEnd();
- char[] cbuf = cc.getBuffer();
- for (int i = start; i < end; i++) {
- char c = cbuf[i];
- // Note: This is clearly incorrect for many strings,
- // but is the only consistent approach within the current
- // servlet framework. It must suffice until servlet output
- // streams properly encode their output.
- if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
- c = ' ';
- }
- buf[pos++] = (byte) c;
- }
-
- }
-
- /**
- * This method will write the contents of the specyfied byte buffer to the
- * output stream, without filtering. This method is meant to be used to
- * write the response header.
- *
- * @param b
- * data to be written
- */
- public void write(byte[] b) {
- // Writing the byte chunk to the output buffer
- System.arraycopy(b, 0, buf, pos, b.length);
- pos = pos + b.length;
- }
-
- /**
- * This method will write the contents of the specyfied String to the output
- * stream, without filtering. This method is meant to be used to write the
- * response header.
- *
- * @param s
- * data to be written
- */
- protected void write(String s) {
- if (s == null) {
- return;
- }
-
- // From the Tomcat 3.3 HTTP/1.0 connector
- int len = s.length();
- for (int i = 0; i < len; i++) {
- char c = s.charAt(i);
- // Note: This is clearly incorrect for many strings,
- // but is the only consistent approach within the current
- // servlet framework. It must suffice until servlet output
- // streams properly encode their output.
- if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
- c = ' ';
- }
-
- buf[pos++] = (byte) c;
- }
- }
-
- /**
- * This method will print the specified integer to the output stream,
- * without filtering. This method is meant to be used to write the response
- * header.
- *
- * @param i
- * data to be written
- */
- protected void write(int i) {
- write(String.valueOf(i));
- }
-
- /**
- * Callback to write data from the buffer.
- */
- protected abstract void flushBuffer() throws IOException;
-
- /**
- * Flush leftover bytes.
- *
- * @return true if all leftover bytes have been flushed
- * @throws IOException
- */
- public abstract boolean flushLeftover() throws IOException;
-
- // ----------------------------------- OutputBufferImpl Inner Class
-
- /**
- * {@code OutputBufferImpl} This class is an output buffer which will write
- * data to an output stream/channel.
- *
- * Created on Jan 10, 2012 at 12:20:15 PM
- *
- * @author <a href="mailto:nbenothm@redhat.com">Nabil
Benothman</a>
- */
- class OutputBufferImpl implements OutputBuffer {
-
- /**
- * Write chunk.
- */
- public int doWrite(ByteChunk chunk, Response res) throws IOException {
- // If non blocking (event) and there are leftover bytes,
- // put all remaining bytes in the leftover buffer (they are
- // part of the same write operation)
- if (leftover.getLength() > 0) {
- leftover.append(chunk);
- return chunk.getLength();
- }
-
- int len = chunk.getLength();
- int start = chunk.getStart();
- byte[] b = chunk.getBuffer();
-
- while (len > 0) {
- int thisTime = len;
- // if (bbuf.position() == bbuf.capacity()) {
- if (!bbuf.hasRemaining()) {
- flushBuffer();
- if (leftover.getLength() > 0) {
- // If non blocking (event) and there are leftover bytes,
- // put all remaining bytes in the leftover buffer (they
- // are
- // part of the same write operation)
- int oldStart = chunk.getOffset();
- chunk.setOffset(start);
- leftover.append(chunk);
- chunk.setOffset(oldStart);
- // After that, all content has been "written"
- return chunk.getLength();
- }
- }
- // if (thisTime > bbuf.capacity() - bbuf.position()) {
- if (thisTime > bbuf.remaining()) {
- // thisTime = bbuf.capacity() - bbuf.position();
- thisTime = bbuf.remaining();
- }
-
- bbuf.put(b, start, thisTime);
- len = len - thisTime;
- start = start + thisTime;
- }
- return chunk.getLength();
- }
- }
-
}
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java 2013-10-14
13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -132,6 +132,13 @@
Integer.valueOf(System.getProperty("org.apache.coyote.http11.DEFAULT_HTTP_HEADER_BUFFER_SIZE",
"8192")).intValue();
+ /**
+ * Async buffer size.
+ */
+ public static final int ASYNC_BUFFER_SIZE =
+
Integer.valueOf(System.getProperty("org.apache.coyote.http11.ASYNC_BUFFER_SIZE",
"32768")).intValue();
+
+
/* Various constant "strings" */
public static final byte[] CRLF_BYTES = ByteChunk.convertToBytes(CRLF);
public static final byte[] COLON_BYTES = ByteChunk.convertToBytes(": ");
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -214,12 +214,6 @@
try {
// If processing a write event, must flush any leftover bytes first
if (status == SocketStatus.OPEN_WRITE) {
- // If the flush does not manage to flush all leftover bytes, the
- // socket should
- // go back to the poller.
- if (!outputBuffer.flushLeftover()) {
- return SocketState.LONG;
- }
// The write notification is now done
writeNotification = false;
// Allow convenient synchronous blocking writes
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -806,11 +806,10 @@
* .AsynchronousSocketChannel, org.apache.tomcat.util.net.ChannelStatus)
*/
@Override
- public SocketState event(NioChannel channel, SocketStatus status) {
+ public synchronized SocketState event(NioChannel channel, SocketStatus status) {
Http11NioProcessor processor = connections.get(channel.getId());
SocketState state = SocketState.CLOSED;
-
if (processor != null) {
processor.startProcessing();
// Call the appropriate event
@@ -868,7 +867,7 @@
state = event(channel, SocketStatus.OPEN_READ);
} else if (proto.endpoint.isRunning()) {
proto.endpoint.addEventChannel(channel, processor.getTimeout(),
- processor.getReadNotifications(),
+ false,
processor.getWriteNotification(),
processor.getResumeNotification(), false);
}
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -80,11 +80,11 @@
*/
private CompletionHandler<Integer, NioChannel> completionHandler;
- /**
- * Latch to wait for non blocking operations to run the completion handler.
- */
+ /**
+ * Latch used for auto blocking.
+ */
private CountDownLatch latch = null;
-
+
/**
* Create a new instance of {@code InternalNioInputBuffer}
*
@@ -111,36 +111,30 @@
this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
@Override
- public void completed(Integer nBytes, NioChannel attachment) {
+ public synchronized void completed(Integer nBytes, NioChannel attachment) {
if (nBytes < 0) {
failed(new ClosedChannelException(), attachment);
return;
}
- try {
- if (nBytes > 0) {
- bbuf.flip();
- bbuf.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- if (!processor.isProcessing() && processor.getReadNotifications())
{
- endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
- }
+ if (nBytes > 0) {
+ bbuf.flip();
+ bbuf.get(buf, pos, nBytes);
+ lastValid = pos + nBytes;
+ latch.countDown();
+ if (!processor.isProcessing() && processor.getReadNotifications()) {
+ endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
}
- } finally {
- latch.countDown();
}
}
@Override
public void failed(Throwable exc, NioChannel attachment) {
- try {
- endpoint.removeEventChannel(attachment);
- if (!processor.isProcessing()) {
- endpoint.processChannel(attachment, SocketStatus.ERROR);
- }
- } finally {
- latch.countDown();
- }
+ exc.printStackTrace();
+ endpoint.removeEventChannel(attachment);
+ if (!processor.isProcessing()) {
+ endpoint.processChannel(attachment, SocketStatus.ERROR);
+ }
}
};
}
@@ -420,13 +414,25 @@
private int fill0() throws IOException {
int nRead = 0;
- // Prepare the internal input buffer for reading
- this.prepare();
// Reading from client
if (nonBlocking) {
- nonBlockingRead(bbuf, readTimeout, unit);
- nRead = lastValid - pos;
+ synchronized (completionHandler) {
+ // Prepare the internal input buffer for reading
+ this.prepare();
+ nonBlockingRead(bbuf, readTimeout, unit);
+ nRead = lastValid - pos;
+ }
+ if (nRead == 0 && !available) {
+ try {
+ latch.await(readTimeout, unit);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ nRead = lastValid - pos;
+ }
} else {
+ // Prepare the internal input buffer for reading
+ this.prepare();
nRead = blockingRead(bbuf, readTimeout, unit);
if (nRead > 0) {
bbuf.flip();
@@ -446,7 +452,6 @@
throw new SocketTimeoutException(MESSAGES.failedRead());
}
}
-
return nRead;
}
@@ -485,7 +490,6 @@
try {
latch = new CountDownLatch(1);
ch.read(bb, ch, this.completionHandler);
- latch.await();
} catch (Throwable t) {
if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(t);
@@ -527,20 +531,28 @@
*/
public int doRead(ByteChunk chunk, Request req) throws IOException {
- if (pos >= lastValid) {
- int nRead = fill0();
- if (nRead < 0) {
- return -1;
- } else if (nRead == 0) {
- return 0;
- }
- }
+ if (pos >= lastValid) {
+ int nRead = fill0();
+ if (nRead < 0) {
+ return -1;
+ } else if (nRead == 0) {
+ return 0;
+ }
+ }
- int length = lastValid - pos;
- chunk.setBytes(buf, pos, length);
- pos = lastValid;
-
- return (length);
+ if (nonBlocking) {
+ synchronized (completionHandler) {
+ int length = lastValid - pos;
+ chunk.setBytes(buf, pos, length);
+ pos = lastValid;
+ return (length);
+ }
+ } else {
+ int length = lastValid - pos;
+ chunk.setBytes(buf, pos, length);
+ pos = lastValid;
+ return (length);
+ }
}
}
}
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-14
13:50:24 UTC (rev 2278)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-15
17:30:15 UTC (rev 2279)
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source.
- * Copyright 2012 Red Hat, Inc., and individual contributors
+ * Copyright 2013 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,15 +21,19 @@
import static org.jboss.web.CoyoteMessages.MESSAGES;
import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
+import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
-import java.nio.channels.WritePendingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.coyote.ActionCode;
+import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.CharChunk;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.SocketStatus;
@@ -42,8 +46,78 @@
*
* @author <a href="mailto:nbenothm@redhat.com">Nabil
Benothman</a>
*/
-public class InternalNioOutputBuffer extends AbstractInternalOutputBuffer {
+public class InternalNioOutputBuffer implements OutputBuffer {
+ /**
+ * Associated Coyote response.
+ */
+ protected Response response;
+
+ /**
+ * Headers of the associated request.
+ */
+ protected MimeHeaders headers;
+
+ /**
+ * Committed flag.
+ */
+ protected boolean committed;
+
+ /**
+ * Finished flag.
+ */
+ protected boolean finished;
+
+ /**
+ * Pointer to the current write buffer.
+ */
+ protected byte[] buf;
+
+ /**
+ * Position in the buffer.
+ */
+ protected int pos;
+
+ /**
+ * Underlying output buffer.
+ */
+ protected OutputBuffer outputBuffer;
+
+ /**
+ * Filter library. Note: Filter[0] is always the "chunked" filter.
+ */
+ protected OutputFilter[] filterLibrary;
+
+ /**
+ * Active filter (which is actually the top of the pipeline).
+ */
+ protected OutputFilter[] activeFilters;
+
+ /**
+ * Index of the last active filter.
+ */
+ protected int lastActiveFilter;
+
+ /**
+ * Direct byte buffer used for writing.
+ */
+ protected ByteBuffer bbuf = null;
+
+ /**
+ * Leftover bytes which could not be written during a non blocking write.
+ */
+ protected ByteChunk leftover = null;
+
+ /**
+ * Non blocking mode.
+ */
+ protected boolean nonBlocking = false;
+
+ /**
+ * Write timeout
+ */
+ protected int writeTimeout = -1;
+
/**
* Underlying channel.
*/
@@ -64,12 +138,12 @@
*/
private CompletionHandler<Integer, NioChannel> completionHandler;
- /**
- * Latch to wait for non blocking operations to run the completion handler.
- */
- private CountDownLatch latch = null;
-
/**
+ * Latch used for auto blocking.
+ */
+ private CountDownLatch latch = new CountDownLatch(0);
+
+ /**
* Create a new instance of {@code InternalNioOutputBuffer}
*
* @param response
@@ -77,11 +151,35 @@
* @param endpoint
*/
public InternalNioOutputBuffer(Http11NioProcessor processor, Response response, int
headerBufferSize, NioEndpoint endpoint) {
- super(response, headerBufferSize);
- this.endpoint = endpoint;
- this.processor = processor;
- // Initialize the input buffer
- this.init();
+
+ this.response = response;
+ this.headers = response.getMimeHeaders();
+ buf = new byte[headerBufferSize];
+ if (headerBufferSize < Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE) {
+ bbuf = ByteBuffer.allocateDirect(6 * 1500);
+ } else {
+ bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
+ }
+
+ outputBuffer = new OutputBufferImpl();
+ filterLibrary = new OutputFilter[0];
+ activeFilters = new OutputFilter[0];
+ lastActiveFilter = -1;
+
+ committed = false;
+ finished = false;
+
+ leftover = new ByteChunk();
+ nonBlocking = false;
+
+ this.endpoint = endpoint;
+ this.processor = processor;
+ // Initialize the input buffer
+ this.init();
+
+ // Cause loading of HttpMessages
+ HttpMessages.getMessage(200);
+
}
/*
@@ -97,34 +195,34 @@
this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
@Override
- public void completed(Integer nBytes, NioChannel attachment) {
- try {
- if (nBytes < 0) {
- failed(new ClosedChannelException(), attachment);
- return;
- }
-
- if (bbuf.hasRemaining()) {
- try {
- attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS,
attachment, this);
- } catch (WritePendingException e) {
- response.setLastWrite(0);
- }
+ public synchronized void completed(Integer nBytes, NioChannel attachment) {
+ if (nBytes < 0) {
+ failed(new IOException(MESSAGES.failedWrite()), attachment);
+ return;
+ }
+ if (!bbuf.hasRemaining()) {
+ bbuf.clear();
+ if (leftover.getLength() > 0) {
+ int n = Math.min(leftover.getLength(), bbuf.remaining());
+ bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+ leftover.setOffset(leftover.getOffset() + n);
} else {
- // Clear the buffer when all bytes are written
- clearBuffer();
response.setLastWrite(nBytes);
+ leftover.recycle();
+ latch.countDown();
if (!processor.isProcessing() &&
processor.getWriteNotification()) {
endpoint.processChannel(attachment,
SocketStatus.OPEN_WRITE);
}
+ return;
}
- } finally {
- latch.countDown();
}
+ // Write the remaining bytes
+ attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment,
this);
}
@Override
public void failed(Throwable exc, NioChannel attachment) {
+ exc.printStackTrace();
endpoint.removeEventChannel(attachment);
if (!processor.isProcessing()) {
endpoint.processChannel(attachment, SocketStatus.ERROR);
@@ -152,15 +250,6 @@
}
/**
- * Recycle the output buffer. This should be called when closing the
- * connection.
- */
- public void recycle() {
- super.recycle();
- channel = null;
- }
-
- /**
* Close the channel
*
* @param channel
@@ -209,10 +298,15 @@
*/
private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
try {
+ // Calculate the number of bytes that fit in the buffer
+ int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
+ // put bytes in the buffer
+ bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+ // Update the offset
+ leftover.setOffset(leftover.getOffset() + n);
+ // Perform the write operation
latch = new CountDownLatch(1);
- // Perform the write operation
- this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
- latch.await();
+ this.channel.write(this.bbuf, timeout, unit, this.channel,
this.completionHandler);
} catch (Throwable t) {
if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
@@ -220,28 +314,25 @@
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.coyote.http11.AbstractInternalOutputBuffer#write(java.nio.
- * ByteBuffer, long, java.util.concurrent.TimeUnit)
- */
- @Override
+ /**
+ * Perform a write operation. The operation may be blocking or non-blocking
+ * depending on the value of {@code nonBlocking} flag.
+ *
+ * @param timeout
+ * a timeout for the operation
+ * @param unit
+ * The time unit of the timeout
+ * @return
+ */
protected int write(final long timeout, final TimeUnit unit) {
- if (nonBlocking) {
- nonBlockingWrite(timeout, unit);
- return 0;
- }
-
return blockingWrite(timeout, unit);
}
- /**
- * Send an acknowledgment.
- *
- * @throws Exception
- */
+ /**
+ * Send an acknowledgment.
+ *
+ * @throws Exception
+ */
public void sendAck() throws Exception {
if (!committed) {
@@ -253,13 +344,15 @@
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.coyote.http11.AbstractInternalOutputBuffer#doWrite(org.apache
- * .tomcat.util.buf.ByteChunk, org.apache.coyote.Response)
- */
+ /**
+ * Write the contents of a byte chunk.
+ *
+ * @param chunk
+ * byte chunk
+ * @return number of bytes written
+ * @throws IOException
+ * an undelying I/O error occured
+ */
public int doWrite(ByteChunk chunk, Response res) throws IOException {
if (!committed) {
@@ -269,12 +362,6 @@
response.action(ActionCode.ACTION_COMMIT, null);
}
- // If non blocking (event) and there are leftover bytes,
- // and lastWrite was 0 -> error
- if (leftover.getLength() > 0 && !(Http11NioProcessor.containerThread.get()
== Boolean.TRUE)) {
- throw new IOException(MESSAGES.invalidBacklog());
- }
-
if (lastActiveFilter == -1) {
return outputBuffer.doWrite(chunk, res);
} else {
@@ -282,66 +369,24 @@
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.coyote.http11.AbstractInternalOutputBuffer#flushBuffer()
- */
+ /**
+ * Callback to write data from the buffer.
+ */
protected void flushBuffer() throws IOException {
int res = 0;
- // If there are still leftover bytes here, this means the user did a
- // direct flush:
- // - If the call is asynchronous, throw an exception
- // - If the call is synchronous, make regular blocking writes to flush
- // the data
- if (leftover.getLength() > 0) {
- if (Http11NioProcessor.containerThread.get() == Boolean.TRUE) {
- // Send leftover bytes
- while (leftover.getLength() > 0) {
- // Calculate the maximum number of bytes that can fit in the
- // buffer
- int n = Math.min(bbuf.capacity() - bbuf.position(), leftover.getLength());
- int off = leftover.getOffset();
- // Put bytes into the buffer
- bbuf.put(leftover.getBuffer(), off, n).flip();
- // Update the offset of the leftover ByteChunck
- leftover.setOffset(off + n);
- while (bbuf.hasRemaining()) {
- res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
- if (res < 0) {
- break;
- }
- }
- bbuf.clear();
- if (res < 0) {
- throw new IOException(MESSAGES.failedWrite());
- }
- }
- leftover.recycle();
- } else {
- throw new IOException(MESSAGES.invalidBacklog());
- }
- }
-
- if (bbuf.position() > 0) {
+ if (!nonBlocking && bbuf.position() > 0) {
bbuf.flip();
- if (nonBlocking) {
- // Perform non blocking writes until all data is written, or the
- // result of the write is 0
- nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
- } else {
- while (bbuf.hasRemaining()) {
- res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
- if (res <= 0) {
- break;
- }
- }
- response.setLastWrite(res);
- // bbuf.clear();
- clearBuffer();
+ while (bbuf.hasRemaining()) {
+ res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+ if (res <= 0) {
+ break;
+ }
}
+ response.setLastWrite(res);
+ // bbuf.clear();
+ clearBuffer();
if (res < 0) {
throw new IOException(MESSAGES.failedWrite());
@@ -349,57 +394,478 @@
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.coyote.http11.AbstractInternalOutputBuffer#flushLeftover()
- */
- @Override
- public boolean flushLeftover() throws IOException {
- if (leftover.getLength() == 0) {
- return true;
- }
- // Calculate the number of bytes that fit in the buffer
- int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
- // put bytes in the buffer
- bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
- // Update the offset
- leftover.setOffset(leftover.getOffset() + n);
- final NioChannel ch = channel;
+ /**
+ * Set the non blocking flag.
+ *
+ * @param nonBlocking
+ */
+ public void setNonBlocking(boolean nonBlocking) {
+ this.nonBlocking = nonBlocking;
+ }
- ch.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, null,
- new CompletionHandler<Integer, Void>() {
+ /**
+ * Get the non blocking flag value.
+ *
+ * @return non blocking
+ */
+ public boolean getNonBlocking() {
+ return nonBlocking;
+ }
- @Override
- public void completed(Integer result, Void attachment) {
- if (result < 0) {
- failed(new IOException(MESSAGES.failedWrite()), attachment);
- return;
- }
- response.setLastWrite(result);
- if (!bbuf.hasRemaining()) {
- bbuf.clear();
- if (leftover.getLength() > 0) {
- int n = Math.min(leftover.getLength(), bbuf.remaining());
- bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
- leftover.setOffset(leftover.getOffset() + n);
- } else {
- leftover.recycle();
- return;
- }
- }
- // Write the remaining bytes
- ch.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, null, this);
- }
+ /**
+ * Add an output filter to the filter library.
+ *
+ * @param filter
+ */
+ public void addFilter(OutputFilter filter) {
- @Override
- public void failed(Throwable exc, Void attachment) {
- close(ch);
- }
- });
+ OutputFilter[] newFilterLibrary = new OutputFilter[filterLibrary.length + 1];
+ for (int i = 0; i < filterLibrary.length; i++) {
+ newFilterLibrary[i] = filterLibrary[i];
+ }
+ newFilterLibrary[filterLibrary.length] = filter;
+ filterLibrary = newFilterLibrary;
+ activeFilters = new OutputFilter[filterLibrary.length];
+ }
- return true;
- }
+ /**
+ * Get filters.
+ *
+ * @return the list of filters
+ */
+ public OutputFilter[] getFilters() {
+ return filterLibrary;
+ }
+ /**
+ * Clear filters.
+ */
+ public void clearFilters() {
+ filterLibrary = new OutputFilter[0];
+ lastActiveFilter = -1;
+ }
+
+ /**
+ * Add an output filter to the filter library.
+ *
+ * @param filter
+ */
+ public void addActiveFilter(OutputFilter filter) {
+
+ if (lastActiveFilter == -1) {
+ filter.setBuffer(outputBuffer);
+ } else {
+ for (int i = 0; i <= lastActiveFilter; i++) {
+ if (activeFilters[i] == filter)
+ return;
+ }
+ filter.setBuffer(activeFilters[lastActiveFilter]);
+ }
+
+ activeFilters[++lastActiveFilter] = filter;
+ filter.setResponse(response);
+ }
+
+ public void removeActiveFilters() {
+ // Recycle filters
+ for (int i = 0; i <= lastActiveFilter; i++) {
+ activeFilters[i].recycle();
+ }
+ lastActiveFilter = -1;
+ }
+
+ /**
+ * Flush the response.
+ *
+ * @throws IOException
+ * an undelying I/O error occured
+ */
+ public void flush() throws IOException {
+ if (!committed) {
+
+ // Send the connector a request for commit. The connector should
+ // then validate the headers, send them (using sendHeader) and
+ // set the filters accordingly.
+ response.action(ActionCode.ACTION_COMMIT, null);
+ }
+
+ // Flush the current buffer
+ flushBuffer();
+ }
+
+ /**
+ *
+ */
+ protected void clearBuffer() {
+ this.bbuf.clear();
+ }
+
+ /**
+ * Recycle this object
+ */
+ public void recycle() {
+ channel = null;
+ // Recycle Request object
+ response.recycle();
+ this.clearBuffer();
+ pos = 0;
+ lastActiveFilter = -1;
+ committed = false;
+ finished = false;
+ }
+
+ /**
+ * End processing of current HTTP request. Note: All bytes of the current
+ * request should have been already consumed. This method only resets all
+ * the pointers so that we are ready to parse the next HTTP request.
+ */
+ public void nextRequest() {
+ // Recycle Request object
+ response.recycle();
+
+ // Recycle filters
+ for (int i = 0; i <= lastActiveFilter; i++) {
+ activeFilters[i].recycle();
+ }
+
+ // Reset pointers
+ leftover.recycle();
+ pos = 0;
+ lastActiveFilter = -1;
+ committed = false;
+ finished = false;
+ nonBlocking = false;
+ }
+
+ /**
+ * End request.
+ *
+ * @throws IOException
+ * an undelying I/O error occured
+ */
+ public void endRequest() throws IOException {
+
+ if (!committed) {
+ // Send the connector a request for commit. The connector should
+ // then validate the headers, send them (using sendHeader) and
+ // set the filters accordingly.
+ response.action(ActionCode.ACTION_COMMIT, null);
+ }
+
+ if (finished) {
+ return;
+ }
+
+ if (lastActiveFilter != -1) {
+ activeFilters[lastActiveFilter].end();
+ }
+
+ flushBuffer();
+ finished = true;
+ }
+
+ // ------------------------------------------------ HTTP/1.1 Output Methods
+
+ /**
+ * Send the response status line.
+ */
+ public void sendStatus() {
+
+ // Write protocol name
+ write(Constants.HTTP_11_BYTES);
+ buf[pos++] = Constants.SP;
+
+ // Write status code
+ int status = response.getStatus();
+ switch (status) {
+ case 200:
+ write(Constants._200_BYTES);
+ break;
+ case 400:
+ write(Constants._400_BYTES);
+ break;
+ case 404:
+ write(Constants._404_BYTES);
+ break;
+ default:
+ write(status);
+ }
+
+ buf[pos++] = Constants.SP;
+
+ // Write message
+ String message = null;
+ if (org.apache.coyote.Constants.USE_CUSTOM_STATUS_MSG_IN_HEADER) {
+ message = response.getMessage();
+ }
+ if (message == null) {
+ write(HttpMessages.getMessage(status));
+ } else {
+ write(message.replace('\n', ' ').replace('\r', '
'));
+ }
+
+ // End the response status line
+ buf[pos++] = Constants.CR;
+ buf[pos++] = Constants.LF;
+ }
+
+ /**
+ * Send a header.
+ *
+ * @param name
+ * Header name
+ * @param value
+ * Header value
+ */
+ public void sendHeader(MessageBytes name, MessageBytes value) {
+ if (name.getLength() > 0 && !value.isNull()) {
+ write(name);
+ buf[pos++] = Constants.COLON;
+ buf[pos++] = Constants.SP;
+ write(value);
+ buf[pos++] = Constants.CR;
+ buf[pos++] = Constants.LF;
+ }
+ }
+
+ /**
+ * Send a header.
+ *
+ * @param name
+ * Header name
+ * @param value
+ * Header value
+ */
+ public void sendHeader(ByteChunk name, ByteChunk value) {
+ write(name);
+ buf[pos++] = Constants.COLON;
+ buf[pos++] = Constants.SP;
+ write(value);
+ buf[pos++] = Constants.CR;
+ buf[pos++] = Constants.LF;
+ }
+
+ /**
+ * Send a header.
+ *
+ * @param name
+ * Header name
+ * @param value
+ * Header value
+ */
+ public void sendHeader(String name, String value) {
+ write(name);
+ buf[pos++] = Constants.COLON;
+ buf[pos++] = Constants.SP;
+ write(value);
+ buf[pos++] = Constants.CR;
+ buf[pos++] = Constants.LF;
+ }
+
+ /**
+ * End the header block.
+ */
+ public void endHeaders() {
+ buf[pos++] = Constants.CR;
+ buf[pos++] = Constants.LF;
+ }
+
+ /**
+ * Commit the response.
+ *
+ * @throws IOException
+ * an undelying I/O error occured
+ */
+ protected void commit() throws IOException {
+
+ // The response is now committed
+ committed = true;
+ response.setCommitted(true);
+
+ if (pos > 0) {
+ // Sending the response header buffer
+ bbuf.put(buf, 0, pos);
+ }
+ }
+
+ /**
+ * This method will write the contents of the specyfied message bytes buffer
+ * to the output stream, without filtering. This method is meant to be used
+ * to write the response header.
+ *
+ * @param mb
+ * data to be written
+ */
+ protected void write(MessageBytes mb) {
+ if (mb == null) {
+ return;
+ }
+
+ switch (mb.getType()) {
+ case MessageBytes.T_BYTES:
+ write(mb.getByteChunk());
+ break;
+ case MessageBytes.T_CHARS:
+ write(mb.getCharChunk());
+ break;
+ default:
+ write(mb.toString());
+ break;
+ }
+ }
+
+ /**
+ * This method will write the contents of the specyfied message bytes buffer
+ * to the output stream, without filtering. This method is meant to be used
+ * to write the response header.
+ *
+ * @param bc
+ * data to be written
+ */
+ protected void write(ByteChunk bc) {
+ // Writing the byte chunk to the output buffer
+ int length = bc.getLength();
+ System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
+ pos = pos + length;
+ }
+
+ /**
+ * This method will write the contents of the specyfied char buffer to the
+ * output stream, without filtering. This method is meant to be used to
+ * write the response header.
+ *
+ * @param cc
+ * data to be written
+ */
+ protected void write(CharChunk cc) {
+ int start = cc.getStart();
+ int end = cc.getEnd();
+ char[] cbuf = cc.getBuffer();
+ for (int i = start; i < end; i++) {
+ char c = cbuf[i];
+ // Note: This is clearly incorrect for many strings,
+ // but is the only consistent approach within the current
+ // servlet framework. It must suffice until servlet output
+ // streams properly encode their output.
+ if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
+ c = ' ';
+ }
+ buf[pos++] = (byte) c;
+ }
+
+ }
+
+ /**
+ * This method will write the contents of the specyfied byte buffer to the
+ * output stream, without filtering. This method is meant to be used to
+ * write the response header.
+ *
+ * @param b
+ * data to be written
+ */
+ public void write(byte[] b) {
+ // Writing the byte chunk to the output buffer
+ System.arraycopy(b, 0, buf, pos, b.length);
+ pos = pos + b.length;
+ }
+
+ /**
+ * This method will write the contents of the specyfied String to the output
+ * stream, without filtering. This method is meant to be used to write the
+ * response header.
+ *
+ * @param s
+ * data to be written
+ */
+ protected void write(String s) {
+ if (s == null) {
+ return;
+ }
+
+ // From the Tomcat 3.3 HTTP/1.0 connector
+ int len = s.length();
+ for (int i = 0; i < len; i++) {
+ char c = s.charAt(i);
+ // Note: This is clearly incorrect for many strings,
+ // but is the only consistent approach within the current
+ // servlet framework. It must suffice until servlet output
+ // streams properly encode their output.
+ if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
+ c = ' ';
+ }
+
+ buf[pos++] = (byte) c;
+ }
+ }
+
+ /**
+ * This method will print the specified integer to the output stream,
+ * without filtering. This method is meant to be used to write the response
+ * header.
+ *
+ * @param i
+ * data to be written
+ */
+ protected void write(int i) {
+ write(String.valueOf(i));
+ }
+
+ // ----------------------------------- OutputBufferImpl Inner Class
+
+ /**
+ * {@code OutputBufferImpl} This class is an output buffer which will write
+ * data to an output stream/channel.
+ *
+ * Created on Jan 10, 2012 at 12:20:15 PM
+ *
+ * @author <a href="mailto:nbenothm@redhat.com">Nabil
Benothman</a>
+ */
+ class OutputBufferImpl implements OutputBuffer {
+
+ /**
+ * Write chunk.
+ */
+ public int doWrite(ByteChunk chunk, Response res) throws IOException {
+ if (nonBlocking) {
+ // Autoblocking if the buffer is already full
+ if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE &&
response.getFlushLeftovers()
+ && Http11AbstractProcessor.containerThread.get() ==
Boolean.TRUE) {
+ try {
+ latch.await(writeTimeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ // FIXME? throw new IOException(MESSAGES.invalidBacklog());
+ }
+ synchronized (completionHandler) {
+ leftover.append(chunk);
+ if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE) {
+ response.setLastWrite(0);
+ }
+ if (latch.getCount() == 0) {
+ nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ } else {
+ int len = chunk.getLength();
+ int start = chunk.getStart();
+ byte[] b = chunk.getBuffer();
+ while (len > 0) {
+ int thisTime = len;
+ if (!bbuf.hasRemaining()) {
+ flushBuffer();
+ }
+ if (thisTime > bbuf.remaining()) {
+ thisTime = bbuf.remaining();
+ }
+
+ bbuf.put(b, start, thisTime);
+ len = len - thisTime;
+ start = start + thisTime;
+ }
+ }
+ return chunk.getLength();
+ }
+ }
+
}