[jbossweb-commits] JBossWeb SVN: r2279 - in branches/7.4.x/src/main/java/org/apache: coyote/http11 and 1 other directory.

jbossweb-commits at lists.jboss.org jbossweb-commits at lists.jboss.org
Tue Oct 15 13:30:15 EDT 2013


Author: remy.maucherat at jboss.com
Date: 2013-10-15 13:30:15 -0400 (Tue, 15 Oct 2013)
New Revision: 2279

Modified:
   branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
- Refactor the non blocking mode of the NIO2 connector.
- Some debug code remains.

Modified: branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/catalina/core/StandardWrapperValve.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -21,6 +21,7 @@
 
 import static org.jboss.web.CatalinaMessages.MESSAGES;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.Iterator;
 
@@ -545,7 +546,7 @@
                     if (error) {
                         Throwable throwable = asyncContext.getError();
                         if (throwable == null) {
-                            throwable = new Exception();
+                            throwable = new EOFException();
                         }
                         if (request.getReadListener() != null) { 
                             request.getReadListener().onError(throwable);

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/AbstractInternalOutputBuffer.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -41,76 +41,6 @@
 public abstract class AbstractInternalOutputBuffer implements OutputBuffer {
 
 	/**
-	 * Associated Coyote response.
-	 */
-	protected Response response;
-
-	/**
-	 * Headers of the associated request.
-	 */
-	protected MimeHeaders headers;
-
-	/**
-	 * Committed flag.
-	 */
-	protected boolean committed;
-
-	/**
-	 * Finished flag.
-	 */
-	protected boolean finished;
-
-	/**
-	 * Pointer to the current write buffer.
-	 */
-	protected byte[] buf;
-
-	/**
-	 * Position in the buffer.
-	 */
-	protected int pos;
-
-	/**
-	 * Underlying output buffer.
-	 */
-	protected OutputBuffer outputBuffer;
-
-	/**
-	 * Filter library. Note: Filter[0] is always the "chunked" filter.
-	 */
-	protected OutputFilter[] filterLibrary;
-
-	/**
-	 * Active filter (which is actually the top of the pipeline).
-	 */
-	protected OutputFilter[] activeFilters;
-
-	/**
-	 * Index of the last active filter.
-	 */
-	protected int lastActiveFilter;
-
-	/**
-	 * Direct byte buffer used for writing.
-	 */
-	protected ByteBuffer bbuf = null;
-
-	/**
-	 * Leftover bytes which could not be written during a non blocking write.
-	 */
-	protected ByteChunk leftover = null;
-
-	/**
-	 * Non blocking mode.
-	 */
-	protected boolean nonBlocking = false;
-
-	/**
-	 * Write timeout
-	 */
-	protected int writeTimeout = -1;
-
-	/**
 	 * Create a new instance of {@code AbstractInternalOutputBuffer}
 	 * 
 	 * @param response
@@ -118,553 +48,6 @@
 	 */
 	public AbstractInternalOutputBuffer(Response response, int headerBufferSize) {
 
-		this.response = response;
-		this.headers = response.getMimeHeaders();
-		buf = new byte[headerBufferSize];
-		if (headerBufferSize < Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE) {
-			bbuf = ByteBuffer.allocateDirect(6 * 1500);
-		} else {
-			bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
-		}
-
-		outputBuffer = new OutputBufferImpl();
-		filterLibrary = new OutputFilter[0];
-		activeFilters = new OutputFilter[0];
-		lastActiveFilter = -1;
-
-		committed = false;
-		finished = false;
-
-		leftover = new ByteChunk();
-		nonBlocking = false;
-
-		// Cause loading of HttpMessages
-		HttpMessages.getMessage(200);
 	}
 
-	/**
-	 * Initialize the internal output buffer
-	 */
-	protected abstract void init();
-
-	/**
-	 * Set the non blocking flag.
-	 * 
-	 * @param nonBlocking
-	 */
-	public void setNonBlocking(boolean nonBlocking) {
-		this.nonBlocking = nonBlocking;
-	}
-
-	/**
-	 * Get the non blocking flag value.
-	 * 
-	 * @return non blocking
-	 */
-	public boolean getNonBlocking() {
-		return nonBlocking;
-	}
-
-	/**
-	 * Add an output filter to the filter library.
-	 * 
-	 * @param filter
-	 */
-	public void addFilter(OutputFilter filter) {
-
-		OutputFilter[] newFilterLibrary = new OutputFilter[filterLibrary.length + 1];
-		for (int i = 0; i < filterLibrary.length; i++) {
-			newFilterLibrary[i] = filterLibrary[i];
-		}
-		newFilterLibrary[filterLibrary.length] = filter;
-		filterLibrary = newFilterLibrary;
-		activeFilters = new OutputFilter[filterLibrary.length];
-	}
-
-	/**
-	 * Get filters.
-	 * 
-	 * @return the list of filters
-	 */
-	public OutputFilter[] getFilters() {
-		return filterLibrary;
-	}
-
-	/**
-	 * Clear filters.
-	 */
-	public void clearFilters() {
-		filterLibrary = new OutputFilter[0];
-		lastActiveFilter = -1;
-	}
-
-	/**
-	 * Perform a write operation. The operation may be blocking or non-blocking
-	 * depending on the value of {@code nonBlocking} flag.
-	 * 
-	 * @param timeout
-	 *            a timeout for the operation
-	 * @param unit
-	 *            The time unit of the timeout
-	 * @return
-	 */
-	protected abstract int write(final long timeout, final TimeUnit unit);
-
-	/**
-	 * Add an output filter to the filter library.
-	 * 
-	 * @param filter
-	 */
-	public void addActiveFilter(OutputFilter filter) {
-
-		if (lastActiveFilter == -1) {
-			filter.setBuffer(outputBuffer);
-		} else {
-			for (int i = 0; i <= lastActiveFilter; i++) {
-				if (activeFilters[i] == filter)
-					return;
-			}
-			filter.setBuffer(activeFilters[lastActiveFilter]);
-		}
-
-		activeFilters[++lastActiveFilter] = filter;
-		filter.setResponse(response);
-	}
-
-    public void removeActiveFilters() {
-        // Recycle filters
-        for (int i = 0; i <= lastActiveFilter; i++) {
-            activeFilters[i].recycle();
-        }
-        lastActiveFilter = -1;
-    }
-
-    /**
-	 * Flush the response.
-	 * 
-	 * @throws IOException
-	 *             an undelying I/O error occured
-	 */
-	public void flush() throws IOException {
-		if (!committed) {
-
-			// Send the connector a request for commit. The connector should
-			// then validate the headers, send them (using sendHeader) and
-			// set the filters accordingly.
-			response.action(ActionCode.ACTION_COMMIT, null);
-		}
-
-		// Flush the current buffer
-		flushBuffer();
-	}
-
-	/**
-	 * 
-	 */
-	protected void clearBuffer() {
-		synchronized (this.bbuf) {
-			this.bbuf.clear();
-		}
-	}
-
-	/**
-	 * Recycle this object
-	 */
-	public void recycle() {
-		// Recycle Request object
-		response.recycle();
-		this.clearBuffer();
-		pos = 0;
-		lastActiveFilter = -1;
-		committed = false;
-		finished = false;
-	}
-
-	/**
-	 * End processing of current HTTP request. Note: All bytes of the current
-	 * request should have been already consumed. This method only resets all
-	 * the pointers so that we are ready to parse the next HTTP request.
-	 */
-	public void nextRequest() {
-		// Recycle Request object
-		response.recycle();
-
-		// Recycle filters
-		for (int i = 0; i <= lastActiveFilter; i++) {
-			activeFilters[i].recycle();
-		}
-
-		// Reset pointers
-		leftover.recycle();
-		pos = 0;
-		lastActiveFilter = -1;
-		committed = false;
-		finished = false;
-		nonBlocking = false;
-	}
-
-	/**
-	 * End request.
-	 * 
-	 * @throws IOException
-	 *             an undelying I/O error occured
-	 */
-	public void endRequest() throws IOException {
-
-		if (!committed) {
-			// Send the connector a request for commit. The connector should
-			// then validate the headers, send them (using sendHeader) and
-			// set the filters accordingly.
-			response.action(ActionCode.ACTION_COMMIT, null);
-		}
-
-		if (finished) {
-			return;
-		}
-
-		if (lastActiveFilter != -1) {
-			activeFilters[lastActiveFilter].end();
-		}
-
-		flushBuffer();
-		finished = true;
-	}
-
-	// ------------------------------------------------ HTTP/1.1 Output Methods
-
-	/**
-	 * Send an acknowledgment.
-	 * 
-	 * @throws Exception
-	 */
-	public abstract void sendAck() throws Exception;
-
-	/**
-	 * Send the response status line.
-	 */
-	public void sendStatus() {
-
-		// Write protocol name
-		write(Constants.HTTP_11_BYTES);
-		buf[pos++] = Constants.SP;
-
-		// Write status code
-		int status = response.getStatus();
-		switch (status) {
-		case 200:
-			write(Constants._200_BYTES);
-			break;
-		case 400:
-			write(Constants._400_BYTES);
-			break;
-		case 404:
-			write(Constants._404_BYTES);
-			break;
-		default:
-			write(status);
-		}
-
-		buf[pos++] = Constants.SP;
-
-		// Write message
-		String message = null;
-		if (org.apache.coyote.Constants.USE_CUSTOM_STATUS_MSG_IN_HEADER) {
-			message = response.getMessage();
-		}
-		if (message == null) {
-			write(HttpMessages.getMessage(status));
-		} else {
-			write(message.replace('\n', ' ').replace('\r', ' '));
-		}
-
-		// End the response status line
-		buf[pos++] = Constants.CR;
-		buf[pos++] = Constants.LF;
-	}
-
-	/**
-	 * Send a header.
-	 * 
-	 * @param name
-	 *            Header name
-	 * @param value
-	 *            Header value
-	 */
-	public void sendHeader(MessageBytes name, MessageBytes value) {
-		if (name.getLength() > 0 && !value.isNull()) {
-			write(name);
-			buf[pos++] = Constants.COLON;
-			buf[pos++] = Constants.SP;
-			write(value);
-			buf[pos++] = Constants.CR;
-			buf[pos++] = Constants.LF;
-		}
-	}
-
-	/**
-	 * Send a header.
-	 * 
-	 * @param name
-	 *            Header name
-	 * @param value
-	 *            Header value
-	 */
-	public void sendHeader(ByteChunk name, ByteChunk value) {
-		write(name);
-		buf[pos++] = Constants.COLON;
-		buf[pos++] = Constants.SP;
-		write(value);
-		buf[pos++] = Constants.CR;
-		buf[pos++] = Constants.LF;
-	}
-
-	/**
-	 * Send a header.
-	 * 
-	 * @param name
-	 *            Header name
-	 * @param value
-	 *            Header value
-	 */
-	public void sendHeader(String name, String value) {
-		write(name);
-		buf[pos++] = Constants.COLON;
-		buf[pos++] = Constants.SP;
-		write(value);
-		buf[pos++] = Constants.CR;
-		buf[pos++] = Constants.LF;
-	}
-
-	/**
-	 * End the header block.
-	 */
-	public void endHeaders() {
-		buf[pos++] = Constants.CR;
-		buf[pos++] = Constants.LF;
-	}
-
-	/**
-	 * Write the contents of a byte chunk.
-	 * 
-	 * @param chunk
-	 *            byte chunk
-	 * @return number of bytes written
-	 * @throws IOException
-	 *             an undelying I/O error occured
-	 */
-	public abstract int doWrite(ByteChunk chunk, Response res) throws IOException;
-
-	/**
-	 * Commit the response.
-	 * 
-	 * @throws IOException
-	 *             an undelying I/O error occured
-	 */
-	protected void commit() throws IOException {
-
-		// The response is now committed
-		committed = true;
-		response.setCommitted(true);
-
-		if (pos > 0) {
-			// Sending the response header buffer
-			bbuf.put(buf, 0, pos);
-		}
-	}
-
-	/**
-	 * This method will write the contents of the specyfied message bytes buffer
-	 * to the output stream, without filtering. This method is meant to be used
-	 * to write the response header.
-	 * 
-	 * @param mb
-	 *            data to be written
-	 */
-	protected void write(MessageBytes mb) {
-		if (mb == null) {
-			return;
-		}
-
-		switch (mb.getType()) {
-		case MessageBytes.T_BYTES:
-			write(mb.getByteChunk());
-			break;
-		case MessageBytes.T_CHARS:
-			write(mb.getCharChunk());
-			break;
-		default:
-			write(mb.toString());
-			break;
-		}
-	}
-
-	/**
-	 * This method will write the contents of the specyfied message bytes buffer
-	 * to the output stream, without filtering. This method is meant to be used
-	 * to write the response header.
-	 * 
-	 * @param bc
-	 *            data to be written
-	 */
-	protected void write(ByteChunk bc) {
-		// Writing the byte chunk to the output buffer
-		int length = bc.getLength();
-		System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
-		pos = pos + length;
-	}
-
-	/**
-	 * This method will write the contents of the specyfied char buffer to the
-	 * output stream, without filtering. This method is meant to be used to
-	 * write the response header.
-	 * 
-	 * @param cc
-	 *            data to be written
-	 */
-	protected void write(CharChunk cc) {
-		int start = cc.getStart();
-		int end = cc.getEnd();
-		char[] cbuf = cc.getBuffer();
-		for (int i = start; i < end; i++) {
-			char c = cbuf[i];
-			// Note: This is clearly incorrect for many strings,
-			// but is the only consistent approach within the current
-			// servlet framework. It must suffice until servlet output
-			// streams properly encode their output.
-			if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
-				c = ' ';
-			}
-			buf[pos++] = (byte) c;
-		}
-
-	}
-
-	/**
-	 * This method will write the contents of the specyfied byte buffer to the
-	 * output stream, without filtering. This method is meant to be used to
-	 * write the response header.
-	 * 
-	 * @param b
-	 *            data to be written
-	 */
-	public void write(byte[] b) {
-		// Writing the byte chunk to the output buffer
-		System.arraycopy(b, 0, buf, pos, b.length);
-		pos = pos + b.length;
-	}
-
-	/**
-	 * This method will write the contents of the specyfied String to the output
-	 * stream, without filtering. This method is meant to be used to write the
-	 * response header.
-	 * 
-	 * @param s
-	 *            data to be written
-	 */
-	protected void write(String s) {
-		if (s == null) {
-			return;
-		}
-
-		// From the Tomcat 3.3 HTTP/1.0 connector
-		int len = s.length();
-		for (int i = 0; i < len; i++) {
-			char c = s.charAt(i);
-			// Note: This is clearly incorrect for many strings,
-			// but is the only consistent approach within the current
-			// servlet framework. It must suffice until servlet output
-			// streams properly encode their output.
-            if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
-				c = ' ';
-			}
-
-			buf[pos++] = (byte) c;
-		}
-	}
-
-	/**
-	 * This method will print the specified integer to the output stream,
-	 * without filtering. This method is meant to be used to write the response
-	 * header.
-	 * 
-	 * @param i
-	 *            data to be written
-	 */
-	protected void write(int i) {
-		write(String.valueOf(i));
-	}
-
-	/**
-	 * Callback to write data from the buffer.
-	 */
-	protected abstract void flushBuffer() throws IOException;
-
-	/**
-	 * Flush leftover bytes.
-	 * 
-	 * @return true if all leftover bytes have been flushed
-	 * @throws IOException
-	 */
-	public abstract boolean flushLeftover() throws IOException;
-
-	// ----------------------------------- OutputBufferImpl Inner Class
-
-	/**
-	 * {@code OutputBufferImpl} This class is an output buffer which will write
-	 * data to an output stream/channel.
-	 * 
-	 * Created on Jan 10, 2012 at 12:20:15 PM
-	 * 
-	 * @author <a href="mailto:nbenothm at redhat.com">Nabil Benothman</a>
-	 */
-	class OutputBufferImpl implements OutputBuffer {
-
-		/**
-		 * Write chunk.
-		 */
-		public int doWrite(ByteChunk chunk, Response res) throws IOException {
-			// If non blocking (event) and there are leftover bytes,
-			// put all remaining bytes in the leftover buffer (they are
-			// part of the same write operation)
-			if (leftover.getLength() > 0) {
-				leftover.append(chunk);
-				return chunk.getLength();
-			}
-
-			int len = chunk.getLength();
-			int start = chunk.getStart();
-			byte[] b = chunk.getBuffer();
-
-			while (len > 0) {
-				int thisTime = len;
-				// if (bbuf.position() == bbuf.capacity()) {
-				if (!bbuf.hasRemaining()) {
-					flushBuffer();
-					if (leftover.getLength() > 0) {
-						// If non blocking (event) and there are leftover bytes,
-						// put all remaining bytes in the leftover buffer (they
-						// are
-						// part of the same write operation)
-						int oldStart = chunk.getOffset();
-						chunk.setOffset(start);
-						leftover.append(chunk);
-						chunk.setOffset(oldStart);
-						// After that, all content has been "written"
-						return chunk.getLength();
-					}
-				}
-				// if (thisTime > bbuf.capacity() - bbuf.position()) {
-				if (thisTime > bbuf.remaining()) {
-					// thisTime = bbuf.capacity() - bbuf.position();
-					thisTime = bbuf.remaining();
-				}
-
-				bbuf.put(b, start, thisTime);
-				len = len - thisTime;
-				start = start + thisTime;
-			}
-			return chunk.getLength();
-		}
-	}
-
 }

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Constants.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -132,6 +132,13 @@
             Integer.valueOf(System.getProperty("org.apache.coyote.http11.DEFAULT_HTTP_HEADER_BUFFER_SIZE", "8192")).intValue();
 
 
+    /**
+     * Async buffer size.
+     */
+    public static final int ASYNC_BUFFER_SIZE = 
+            Integer.valueOf(System.getProperty("org.apache.coyote.http11.ASYNC_BUFFER_SIZE", "32768")).intValue();
+
+
     /* Various constant "strings" */
     public static final byte[] CRLF_BYTES = ByteChunk.convertToBytes(CRLF);
     public static final byte[] COLON_BYTES = ByteChunk.convertToBytes(": ");

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -214,12 +214,6 @@
 		try {
 			// If processing a write event, must flush any leftover bytes first
 			if (status == SocketStatus.OPEN_WRITE) {
-				// If the flush does not manage to flush all leftover bytes, the
-				// socket should
-				// go back to the poller.
-				if (!outputBuffer.flushLeftover()) {
-					return SocketState.LONG;
-				}
 				// The write notification is now done
 				writeNotification = false;
 				// Allow convenient synchronous blocking writes

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -806,11 +806,10 @@
 		 * .AsynchronousSocketChannel, org.apache.tomcat.util.net.ChannelStatus)
 		 */
 		@Override
-		public SocketState event(NioChannel channel, SocketStatus status) {
+		public synchronized SocketState event(NioChannel channel, SocketStatus status) {
 
 			Http11NioProcessor processor = connections.get(channel.getId());
 			SocketState state = SocketState.CLOSED;
-
 			if (processor != null) {
 				processor.startProcessing();
 				// Call the appropriate event
@@ -868,7 +867,7 @@
 					        state = event(channel, SocketStatus.OPEN_READ);
 					    } else if (proto.endpoint.isRunning()) {
 					        proto.endpoint.addEventChannel(channel, processor.getTimeout(),
-					                processor.getReadNotifications(),
+					                false,
 					                processor.getWriteNotification(),
 					                processor.getResumeNotification(), false);
 					    }

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -80,11 +80,11 @@
 	 */
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
-	/**
-	 * Latch to wait for non blocking operations to run the completion handler.
-	 */
+    /**
+     * Latch used for auto blocking.
+     */
     private CountDownLatch latch = null;
-
+    
     /**
 	 * Create a new instance of {@code InternalNioInputBuffer}
 	 * 
@@ -111,36 +111,30 @@
 		this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
 
 			@Override
-			public void completed(Integer nBytes, NioChannel attachment) {
+			public synchronized void completed(Integer nBytes, NioChannel attachment) {
 			    if (nBytes < 0) {
 			        failed(new ClosedChannelException(), attachment);
 			        return;
 			    }
 
-			    try {
-			        if (nBytes > 0) {
-			            bbuf.flip();
-			            bbuf.get(buf, pos, nBytes);
-			            lastValid = pos + nBytes;
-			            if (!processor.isProcessing() && processor.getReadNotifications()) {
-			                endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
-			            }
+			    if (nBytes > 0) {
+			        bbuf.flip();
+			        bbuf.get(buf, pos, nBytes);
+			        lastValid = pos + nBytes;
+			        latch.countDown();
+			        if (!processor.isProcessing() && processor.getReadNotifications()) {
+			            endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
 			        }
-			    } finally {
-			        latch.countDown();
 			    }
 			}
 
 			@Override
 			public void failed(Throwable exc, NioChannel attachment) {
-                try {
-                    endpoint.removeEventChannel(attachment);
-                    if (!processor.isProcessing()) {
-                        endpoint.processChannel(attachment, SocketStatus.ERROR);
-                    }
-                } finally {
-                    latch.countDown();
-                }
+			    exc.printStackTrace();
+			    endpoint.removeEventChannel(attachment);
+			    if (!processor.isProcessing()) {
+			        endpoint.processChannel(attachment, SocketStatus.ERROR);
+			    }
 			}
 		};
 	}
@@ -420,13 +414,25 @@
 
     private int fill0() throws IOException {
         int nRead = 0;
-        // Prepare the internal input buffer for reading
-        this.prepare();
         // Reading from client
         if (nonBlocking) {
-            nonBlockingRead(bbuf, readTimeout, unit);
-            nRead = lastValid - pos;
+            synchronized (completionHandler) {
+                // Prepare the internal input buffer for reading
+                this.prepare();
+                nonBlockingRead(bbuf, readTimeout, unit);
+                nRead = lastValid - pos;
+            }
+            if (nRead == 0 && !available) {
+                try {
+                    latch.await(readTimeout, unit);
+                } catch (InterruptedException e) {
+                    // Ignore
+                }
+                nRead = lastValid - pos;
+            }
         } else {
+            // Prepare the internal input buffer for reading
+            this.prepare();
             nRead = blockingRead(bbuf, readTimeout, unit);
             if (nRead > 0) {
                 bbuf.flip();
@@ -446,7 +452,6 @@
                 throw new SocketTimeoutException(MESSAGES.failedRead());
             }
         }
-
         return nRead;
     }
 
@@ -485,7 +490,6 @@
 		try {
 		    latch = new CountDownLatch(1);
 			ch.read(bb, ch, this.completionHandler);
-			latch.await();
 		} catch (Throwable t) {
 			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
 			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(t);
@@ -527,20 +531,28 @@
 		 */
 		public int doRead(ByteChunk chunk, Request req) throws IOException {
 
-			if (pos >= lastValid) {
-			    int nRead = fill0();
-				if (nRead < 0) {
-					return -1;
-				} else if (nRead == 0) {
-				    return 0;
-				}
-			}
+            if (pos >= lastValid) {
+                int nRead = fill0();
+                if (nRead < 0) {
+                    return -1;
+                } else if (nRead == 0) {
+                    return 0;
+                }
+            }
 
-			int length = lastValid - pos;
-			chunk.setBytes(buf, pos, length);
-			pos = lastValid;
-
-			return (length);
+            if (nonBlocking) {
+                synchronized (completionHandler) {
+                    int length = lastValid - pos;
+                    chunk.setBytes(buf, pos, length);
+                    pos = lastValid;
+                    return (length);
+                }
+		    } else {
+		        int length = lastValid - pos;
+		        chunk.setBytes(buf, pos, length);
+		        pos = lastValid;
+		        return (length);
+		    }
 		}
 	}
 }

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-14 13:50:24 UTC (rev 2278)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-15 17:30:15 UTC (rev 2279)
@@ -1,6 +1,6 @@
 /*
  * JBoss, Home of Professional Open Source.
- * Copyright 2012 Red Hat, Inc., and individual contributors
+ * Copyright 2013 Red Hat, Inc., and individual contributors
  * as indicated by the @author tags.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,15 +21,19 @@
 import static org.jboss.web.CoyoteMessages.MESSAGES;
 
 import java.io.IOException;
-import java.nio.channels.ClosedChannelException;
+import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
-import java.nio.channels.WritePendingException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.coyote.ActionCode;
+import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Response;
 import org.apache.tomcat.util.buf.ByteChunk;
+import org.apache.tomcat.util.buf.CharChunk;
+import org.apache.tomcat.util.buf.MessageBytes;
+import org.apache.tomcat.util.http.HttpMessages;
+import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
 import org.apache.tomcat.util.net.SocketStatus;
@@ -42,8 +46,78 @@
  * 
  * @author <a href="mailto:nbenothm at redhat.com">Nabil Benothman</a>
  */
-public class InternalNioOutputBuffer extends AbstractInternalOutputBuffer {
+public class InternalNioOutputBuffer implements OutputBuffer {
 
+    /**
+     * Associated Coyote response.
+     */
+    protected Response response;
+
+    /**
+     * Headers of the associated request.
+     */
+    protected MimeHeaders headers;
+
+    /**
+     * Committed flag.
+     */
+    protected boolean committed;
+
+    /**
+     * Finished flag.
+     */
+    protected boolean finished;
+
+    /**
+     * Pointer to the current write buffer.
+     */
+    protected byte[] buf;
+
+    /**
+     * Position in the buffer.
+     */
+    protected int pos;
+
+    /**
+     * Underlying output buffer.
+     */
+    protected OutputBuffer outputBuffer;
+
+    /**
+     * Filter library. Note: Filter[0] is always the "chunked" filter.
+     */
+    protected OutputFilter[] filterLibrary;
+
+    /**
+     * Active filter (which is actually the top of the pipeline).
+     */
+    protected OutputFilter[] activeFilters;
+
+    /**
+     * Index of the last active filter.
+     */
+    protected int lastActiveFilter;
+
+    /**
+     * Direct byte buffer used for writing.
+     */
+    protected ByteBuffer bbuf = null;
+
+    /**
+     * Leftover bytes which could not be written during a non blocking write.
+     */
+    protected ByteChunk leftover = null;
+
+    /**
+     * Non blocking mode.
+     */
+    protected boolean nonBlocking = false;
+
+    /**
+     * Write timeout
+     */
+    protected int writeTimeout = -1;
+
 	/**
 	 * Underlying channel.
 	 */
@@ -64,12 +138,12 @@
 	 */
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
-    /**
-     * Latch to wait for non blocking operations to run the completion handler.
-     */
-    private CountDownLatch latch = null;
-
 	/**
+	 * Latch used for auto blocking.
+	 */
+	private CountDownLatch latch = new CountDownLatch(0);
+	
+	/**
 	 * Create a new instance of {@code InternalNioOutputBuffer}
 	 * 
 	 * @param response
@@ -77,11 +151,35 @@
 	 * @param endpoint
 	 */
 	public InternalNioOutputBuffer(Http11NioProcessor processor, Response response, int headerBufferSize, NioEndpoint endpoint) {
-		super(response, headerBufferSize);
-		this.endpoint = endpoint;
-		this.processor = processor;
-		// Initialize the input buffer
-		this.init();
+
+        this.response = response;
+        this.headers = response.getMimeHeaders();
+        buf = new byte[headerBufferSize];
+        if (headerBufferSize < Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE) {
+            bbuf = ByteBuffer.allocateDirect(6 * 1500);
+        } else {
+            bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
+        }
+
+        outputBuffer = new OutputBufferImpl();
+        filterLibrary = new OutputFilter[0];
+        activeFilters = new OutputFilter[0];
+        lastActiveFilter = -1;
+
+        committed = false;
+        finished = false;
+
+        leftover = new ByteChunk();
+        nonBlocking = false;
+
+        this.endpoint = endpoint;
+        this.processor = processor;
+        // Initialize the input buffer
+        this.init();
+
+        // Cause loading of HttpMessages
+        HttpMessages.getMessage(200);
+
 	}
 
 	/*
@@ -97,34 +195,34 @@
 		this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
 
 			@Override
-			public void completed(Integer nBytes, NioChannel attachment) {
-                try {
-                    if (nBytes < 0) {
-                        failed(new ClosedChannelException(), attachment);
-                        return;
-                    }
-
-                    if (bbuf.hasRemaining()) {
-                        try {
-                            attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
-                        } catch (WritePendingException e) {
-                            response.setLastWrite(0);
-                        }
+			public synchronized void completed(Integer nBytes, NioChannel attachment) {
+                if (nBytes < 0) {
+                    failed(new IOException(MESSAGES.failedWrite()), attachment);
+                    return;
+                }
+                if (!bbuf.hasRemaining()) {
+                    bbuf.clear();
+                    if (leftover.getLength() > 0) {
+                        int n = Math.min(leftover.getLength(), bbuf.remaining());
+                        bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+                        leftover.setOffset(leftover.getOffset() + n);
                     } else {
-                        // Clear the buffer when all bytes are written
-                        clearBuffer();
                         response.setLastWrite(nBytes);
+                        leftover.recycle();
+                        latch.countDown();
                         if (!processor.isProcessing() && processor.getWriteNotification()) {
                             endpoint.processChannel(attachment, SocketStatus.OPEN_WRITE);
                         }
+                        return;
                     }
-                } finally {
-                    latch.countDown();
                 }
+                // Write the remaining bytes
+                attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
 			}
 
 			@Override
 			public void failed(Throwable exc, NioChannel attachment) {
+			    exc.printStackTrace();
 				endpoint.removeEventChannel(attachment);
                 if (!processor.isProcessing()) {
                     endpoint.processChannel(attachment, SocketStatus.ERROR);
@@ -152,15 +250,6 @@
 	}
 
 	/**
-	 * Recycle the output buffer. This should be called when closing the
-	 * connection.
-	 */
-	public void recycle() {
-		super.recycle();
-		channel = null;
-	}
-
-	/**
 	 * Close the channel
 	 * 
 	 * @param channel
@@ -209,10 +298,15 @@
 	 */
 	private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
 		try {
+		    // Calculate the number of bytes that fit in the buffer
+		    int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
+		    // put bytes in the buffer
+		    bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+		    // Update the offset
+		    leftover.setOffset(leftover.getOffset() + n);
+		    // Perform the write operation
 		    latch = new CountDownLatch(1);
-			// Perform the write operation
-			this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
-			latch.await();
+		    this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
 		} catch (Throwable t) {
 			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
 			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
@@ -220,28 +314,25 @@
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.coyote.http11.AbstractInternalOutputBuffer#write(java.nio.
-	 * ByteBuffer, long, java.util.concurrent.TimeUnit)
-	 */
-	@Override
+    /**
+     * Perform a write operation. The operation may be blocking or non-blocking
+     * depending on the value of {@code nonBlocking} flag.
+     * 
+     * @param timeout
+     *            a timeout for the operation
+     * @param unit
+     *            The time unit of the timeout
+     * @return
+     */
 	protected int write(final long timeout, final TimeUnit unit) {
-		if (nonBlocking) {
-			nonBlockingWrite(timeout, unit);
-			return 0;
-		}
-
 		return blockingWrite(timeout, unit);
 	}
 
-	/**
-	 * Send an acknowledgment.
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Send an acknowledgment.
+     * 
+     * @throws Exception
+     */
 	public void sendAck() throws Exception {
 
 		if (!committed) {
@@ -253,13 +344,15 @@
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.coyote.http11.AbstractInternalOutputBuffer#doWrite(org.apache
-	 * .tomcat.util.buf.ByteChunk, org.apache.coyote.Response)
-	 */
+    /**
+     * Write the contents of a byte chunk.
+     * 
+     * @param chunk
+     *            byte chunk
+     * @return number of bytes written
+     * @throws IOException
+     *             an undelying I/O error occured
+     */
 	public int doWrite(ByteChunk chunk, Response res) throws IOException {
 
 		if (!committed) {
@@ -269,12 +362,6 @@
 			response.action(ActionCode.ACTION_COMMIT, null);
 		}
 
-		// If non blocking (event) and there are leftover bytes,
-		// and lastWrite was 0 -> error
-		if (leftover.getLength() > 0 && !(Http11NioProcessor.containerThread.get() == Boolean.TRUE)) {
-			throw new IOException(MESSAGES.invalidBacklog());
-		}
-
 		if (lastActiveFilter == -1) {
 			return outputBuffer.doWrite(chunk, res);
 		} else {
@@ -282,66 +369,24 @@
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see org.apache.coyote.http11.AbstractInternalOutputBuffer#flushBuffer()
-	 */
+    /**
+     * Callback to write data from the buffer.
+     */
 	protected void flushBuffer() throws IOException {
 		int res = 0;
 
-		// If there are still leftover bytes here, this means the user did a
-		// direct flush:
-		// - If the call is asynchronous, throw an exception
-		// - If the call is synchronous, make regular blocking writes to flush
-		// the data
-		if (leftover.getLength() > 0) {
-			if (Http11NioProcessor.containerThread.get() == Boolean.TRUE) {
-				// Send leftover bytes
-				while (leftover.getLength() > 0) {
-					// Calculate the maximum number of bytes that can fit in the
-					// buffer
-					int n = Math.min(bbuf.capacity() - bbuf.position(), leftover.getLength());
-					int off = leftover.getOffset();
-					// Put bytes into the buffer
-					bbuf.put(leftover.getBuffer(), off, n).flip();
-					// Update the offset of the leftover ByteChunck
-					leftover.setOffset(off + n);
-					while (bbuf.hasRemaining()) {
-						res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
-						if (res < 0) {
-							break;
-						}
-					}
-					bbuf.clear();
-					if (res < 0) {
-						throw new IOException(MESSAGES.failedWrite());
-					}
-				}
-				leftover.recycle();
-			} else {
-				throw new IOException(MESSAGES.invalidBacklog());
-			}
-		}
-
-		if (bbuf.position() > 0) {
+		if (!nonBlocking && bbuf.position() > 0) {
 			bbuf.flip();
 
-			if (nonBlocking) {
-				// Perform non blocking writes until all data is written, or the
-				// result of the write is 0
-				nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
-			} else {
-				while (bbuf.hasRemaining()) {
-					res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
-					if (res <= 0) {
-						break;
-					}
-				}
-				response.setLastWrite(res);
-				// bbuf.clear();
-				clearBuffer();
+			while (bbuf.hasRemaining()) {
+			    res = blockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+			    if (res <= 0) {
+			        break;
+			    }
 			}
+			response.setLastWrite(res);
+			// bbuf.clear();
+			clearBuffer();
 
 			if (res < 0) {
 				throw new IOException(MESSAGES.failedWrite());
@@ -349,57 +394,478 @@
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * 
-	 * @see
-	 * org.apache.coyote.http11.AbstractInternalOutputBuffer#flushLeftover()
-	 */
-	@Override
-	public boolean flushLeftover() throws IOException {
-	    if (leftover.getLength() == 0) {
-	        return true;
-	    }
-		// Calculate the number of bytes that fit in the buffer
-		int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
-		// put bytes in the buffer
-		bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
-		// Update the offset
-		leftover.setOffset(leftover.getOffset() + n);
-		final NioChannel ch = channel;
+   /**
+     * Set the non blocking flag.
+     * 
+     * @param nonBlocking
+     */
+    public void setNonBlocking(boolean nonBlocking) {
+        this.nonBlocking = nonBlocking;
+    }
 
-		ch.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, null,
-				new CompletionHandler<Integer, Void>() {
+    /**
+     * Get the non blocking flag value.
+     * 
+     * @return non blocking
+     */
+    public boolean getNonBlocking() {
+        return nonBlocking;
+    }
 
-					@Override
-					public void completed(Integer result, Void attachment) {
-						if (result < 0) {
-							failed(new IOException(MESSAGES.failedWrite()), attachment);
-							return;
-						}
-						response.setLastWrite(result);
-						if (!bbuf.hasRemaining()) {
-							bbuf.clear();
-							if (leftover.getLength() > 0) {
-								int n = Math.min(leftover.getLength(), bbuf.remaining());
-								bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
-								leftover.setOffset(leftover.getOffset() + n);
-							} else {
-								leftover.recycle();
-								return;
-							}
-						}
-						// Write the remaining bytes
-						ch.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, null, this);
-					}
+    /**
+     * Add an output filter to the filter library.
+     * 
+     * @param filter
+     */
+    public void addFilter(OutputFilter filter) {
 
-					@Override
-					public void failed(Throwable exc, Void attachment) {
-						close(ch);
-					}
-				});
+        OutputFilter[] newFilterLibrary = new OutputFilter[filterLibrary.length + 1];
+        for (int i = 0; i < filterLibrary.length; i++) {
+            newFilterLibrary[i] = filterLibrary[i];
+        }
+        newFilterLibrary[filterLibrary.length] = filter;
+        filterLibrary = newFilterLibrary;
+        activeFilters = new OutputFilter[filterLibrary.length];
+    }
 
-		return true;
-	}
+    /**
+     * Get filters.
+     * 
+     * @return the list of filters
+     */
+    public OutputFilter[] getFilters() {
+        return filterLibrary;
+    }
 
+    /**
+     * Clear filters.
+     */
+    public void clearFilters() {
+        filterLibrary = new OutputFilter[0];
+        lastActiveFilter = -1;
+    }
+
+    /**
+     * Add an output filter to the filter library.
+     * 
+     * @param filter
+     */
+    public void addActiveFilter(OutputFilter filter) {
+
+        if (lastActiveFilter == -1) {
+            filter.setBuffer(outputBuffer);
+        } else {
+            for (int i = 0; i <= lastActiveFilter; i++) {
+                if (activeFilters[i] == filter)
+                    return;
+            }
+            filter.setBuffer(activeFilters[lastActiveFilter]);
+        }
+
+        activeFilters[++lastActiveFilter] = filter;
+        filter.setResponse(response);
+    }
+
+    public void removeActiveFilters() {
+        // Recycle filters
+        for (int i = 0; i <= lastActiveFilter; i++) {
+            activeFilters[i].recycle();
+        }
+        lastActiveFilter = -1;
+    }
+
+    /**
+     * Flush the response.
+     * 
+     * @throws IOException
+     *             an undelying I/O error occured
+     */
+    public void flush() throws IOException {
+        if (!committed) {
+
+            // Send the connector a request for commit. The connector should
+            // then validate the headers, send them (using sendHeader) and
+            // set the filters accordingly.
+            response.action(ActionCode.ACTION_COMMIT, null);
+        }
+
+        // Flush the current buffer
+        flushBuffer();
+    }
+
+    /**
+     * 
+     */
+    protected void clearBuffer() {
+        this.bbuf.clear();
+    }
+
+    /**
+     * Recycle this object
+     */
+    public void recycle() {
+        channel = null;
+        // Recycle Request object
+        response.recycle();
+        this.clearBuffer();
+        pos = 0;
+        lastActiveFilter = -1;
+        committed = false;
+        finished = false;
+    }
+
+    /**
+     * End processing of current HTTP request. Note: All bytes of the current
+     * request should have been already consumed. This method only resets all
+     * the pointers so that we are ready to parse the next HTTP request.
+     */
+    public void nextRequest() {
+        // Recycle Request object
+        response.recycle();
+
+        // Recycle filters
+        for (int i = 0; i <= lastActiveFilter; i++) {
+            activeFilters[i].recycle();
+        }
+
+        // Reset pointers
+        leftover.recycle();
+        pos = 0;
+        lastActiveFilter = -1;
+        committed = false;
+        finished = false;
+        nonBlocking = false;
+    }
+
+    /**
+     * End request.
+     * 
+     * @throws IOException
+     *             an undelying I/O error occured
+     */
+    public void endRequest() throws IOException {
+
+        if (!committed) {
+            // Send the connector a request for commit. The connector should
+            // then validate the headers, send them (using sendHeader) and
+            // set the filters accordingly.
+            response.action(ActionCode.ACTION_COMMIT, null);
+        }
+
+        if (finished) {
+            return;
+        }
+
+        if (lastActiveFilter != -1) {
+            activeFilters[lastActiveFilter].end();
+        }
+
+        flushBuffer();
+        finished = true;
+    }
+
+    // ------------------------------------------------ HTTP/1.1 Output Methods
+
+    /**
+     * Send the response status line.
+     */
+    public void sendStatus() {
+
+        // Write protocol name
+        write(Constants.HTTP_11_BYTES);
+        buf[pos++] = Constants.SP;
+
+        // Write status code
+        int status = response.getStatus();
+        switch (status) {
+        case 200:
+            write(Constants._200_BYTES);
+            break;
+        case 400:
+            write(Constants._400_BYTES);
+            break;
+        case 404:
+            write(Constants._404_BYTES);
+            break;
+        default:
+            write(status);
+        }
+
+        buf[pos++] = Constants.SP;
+
+        // Write message
+        String message = null;
+        if (org.apache.coyote.Constants.USE_CUSTOM_STATUS_MSG_IN_HEADER) {
+            message = response.getMessage();
+        }
+        if (message == null) {
+            write(HttpMessages.getMessage(status));
+        } else {
+            write(message.replace('\n', ' ').replace('\r', ' '));
+        }
+
+        // End the response status line
+        buf[pos++] = Constants.CR;
+        buf[pos++] = Constants.LF;
+    }
+
+    /**
+     * Send a header.
+     * 
+     * @param name
+     *            Header name
+     * @param value
+     *            Header value
+     */
+    public void sendHeader(MessageBytes name, MessageBytes value) {
+        if (name.getLength() > 0 && !value.isNull()) {
+            write(name);
+            buf[pos++] = Constants.COLON;
+            buf[pos++] = Constants.SP;
+            write(value);
+            buf[pos++] = Constants.CR;
+            buf[pos++] = Constants.LF;
+        }
+    }
+
+    /**
+     * Send a header.
+     * 
+     * @param name
+     *            Header name
+     * @param value
+     *            Header value
+     */
+    public void sendHeader(ByteChunk name, ByteChunk value) {
+        write(name);
+        buf[pos++] = Constants.COLON;
+        buf[pos++] = Constants.SP;
+        write(value);
+        buf[pos++] = Constants.CR;
+        buf[pos++] = Constants.LF;
+    }
+
+    /**
+     * Send a header.
+     * 
+     * @param name
+     *            Header name
+     * @param value
+     *            Header value
+     */
+    public void sendHeader(String name, String value) {
+        write(name);
+        buf[pos++] = Constants.COLON;
+        buf[pos++] = Constants.SP;
+        write(value);
+        buf[pos++] = Constants.CR;
+        buf[pos++] = Constants.LF;
+    }
+
+    /**
+     * End the header block.
+     */
+    public void endHeaders() {
+        buf[pos++] = Constants.CR;
+        buf[pos++] = Constants.LF;
+    }
+
+    /**
+     * Commit the response.
+     * 
+     * @throws IOException
+     *             an undelying I/O error occured
+     */
+    protected void commit() throws IOException {
+
+        // The response is now committed
+        committed = true;
+        response.setCommitted(true);
+
+        if (pos > 0) {
+            // Sending the response header buffer
+            bbuf.put(buf, 0, pos);
+        }
+    }
+
+    /**
+     * This method will write the contents of the specyfied message bytes buffer
+     * to the output stream, without filtering. This method is meant to be used
+     * to write the response header.
+     * 
+     * @param mb
+     *            data to be written
+     */
+    protected void write(MessageBytes mb) {
+        if (mb == null) {
+            return;
+        }
+
+        switch (mb.getType()) {
+        case MessageBytes.T_BYTES:
+            write(mb.getByteChunk());
+            break;
+        case MessageBytes.T_CHARS:
+            write(mb.getCharChunk());
+            break;
+        default:
+            write(mb.toString());
+            break;
+        }
+    }
+
+    /**
+     * This method will write the contents of the specyfied message bytes buffer
+     * to the output stream, without filtering. This method is meant to be used
+     * to write the response header.
+     * 
+     * @param bc
+     *            data to be written
+     */
+    protected void write(ByteChunk bc) {
+        // Writing the byte chunk to the output buffer
+        int length = bc.getLength();
+        System.arraycopy(bc.getBytes(), bc.getStart(), buf, pos, length);
+        pos = pos + length;
+    }
+
+    /**
+     * This method will write the contents of the specyfied char buffer to the
+     * output stream, without filtering. This method is meant to be used to
+     * write the response header.
+     * 
+     * @param cc
+     *            data to be written
+     */
+    protected void write(CharChunk cc) {
+        int start = cc.getStart();
+        int end = cc.getEnd();
+        char[] cbuf = cc.getBuffer();
+        for (int i = start; i < end; i++) {
+            char c = cbuf[i];
+            // Note: This is clearly incorrect for many strings,
+            // but is the only consistent approach within the current
+            // servlet framework. It must suffice until servlet output
+            // streams properly encode their output.
+            if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
+                c = ' ';
+            }
+            buf[pos++] = (byte) c;
+        }
+
+    }
+
+    /**
+     * This method will write the contents of the specyfied byte buffer to the
+     * output stream, without filtering. This method is meant to be used to
+     * write the response header.
+     * 
+     * @param b
+     *            data to be written
+     */
+    public void write(byte[] b) {
+        // Writing the byte chunk to the output buffer
+        System.arraycopy(b, 0, buf, pos, b.length);
+        pos = pos + b.length;
+    }
+
+    /**
+     * This method will write the contents of the specyfied String to the output
+     * stream, without filtering. This method is meant to be used to write the
+     * response header.
+     * 
+     * @param s
+     *            data to be written
+     */
+    protected void write(String s) {
+        if (s == null) {
+            return;
+        }
+
+        // From the Tomcat 3.3 HTTP/1.0 connector
+        int len = s.length();
+        for (int i = 0; i < len; i++) {
+            char c = s.charAt(i);
+            // Note: This is clearly incorrect for many strings,
+            // but is the only consistent approach within the current
+            // servlet framework. It must suffice until servlet output
+            // streams properly encode their output.
+            if (((c <= 31) && (c != 9)) || c == 127 || c > 255) {
+                c = ' ';
+            }
+
+            buf[pos++] = (byte) c;
+        }
+    }
+
+    /**
+     * This method will print the specified integer to the output stream,
+     * without filtering. This method is meant to be used to write the response
+     * header.
+     * 
+     * @param i
+     *            data to be written
+     */
+    protected void write(int i) {
+        write(String.valueOf(i));
+    }
+
+    // ----------------------------------- OutputBufferImpl Inner Class
+
+    /**
+     * {@code OutputBufferImpl} This class is an output buffer which will write
+     * data to an output stream/channel.
+     * 
+     * Created on Jan 10, 2012 at 12:20:15 PM
+     * 
+     * @author <a href="mailto:nbenothm at redhat.com">Nabil Benothman</a>
+     */
+    class OutputBufferImpl implements OutputBuffer {
+
+        /**
+         * Write chunk.
+         */
+        public int doWrite(ByteChunk chunk, Response res) throws IOException {
+            if (nonBlocking) {
+                // Autoblocking if the buffer is already full
+                if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE && response.getFlushLeftovers()
+                        && Http11AbstractProcessor.containerThread.get() == Boolean.TRUE) {
+                    try {
+                        latch.await(writeTimeout, TimeUnit.MILLISECONDS);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                    // FIXME? throw new IOException(MESSAGES.invalidBacklog());
+                }
+                synchronized (completionHandler) {
+                    leftover.append(chunk);
+                    if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE) {
+                        response.setLastWrite(0);
+                    }
+                    if (latch.getCount() == 0) {
+                        nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+                    }
+                }
+            } else {
+                int len = chunk.getLength();
+                int start = chunk.getStart();
+                byte[] b = chunk.getBuffer();
+                while (len > 0) {
+                    int thisTime = len;
+                    if (!bbuf.hasRemaining()) {
+                        flushBuffer();
+                    }
+                    if (thisTime > bbuf.remaining()) {
+                        thisTime = bbuf.remaining();
+                    }
+
+                    bbuf.put(b, start, thisTime);
+                    len = len - thisTime;
+                    start = start + thisTime;
+                }
+            }
+            return chunk.getLength();
+        }
+    }
+
 }



More information about the jbossweb-commits mailing list