[jbossweb-commits] JBossWeb SVN: r2277 - branches/7.4.x/src/main/java/org/apache/coyote/http11.

jbossweb-commits at lists.jboss.org jbossweb-commits at lists.jboss.org
Fri Oct 11 12:18:21 EDT 2013


Author: remy.maucherat at jboss.com
Date: 2013-10-11 12:18:21 -0400 (Fri, 11 Oct 2013)
New Revision: 2277

Modified:
   branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
Fix some input and output corruption when using non blocking, hopefully without causing regressions.

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java	2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java	2013-10-11 16:18:21 UTC (rev 2277)
@@ -96,7 +96,7 @@
 
 		this.endpoint = endpoint;
 		request = new Request();
-		inputBuffer = new InternalNioInputBuffer(request, headerBufferSize, endpoint);
+		inputBuffer = new InternalNioInputBuffer(this, request, headerBufferSize, endpoint);
 		request.setInputBuffer(inputBuffer);
 		if (endpoint.getUseSendfile()) {
 			request.setSendfile(true);
@@ -104,7 +104,7 @@
 
 		response = new Response();
 		response.setHook(this);
-		outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize, endpoint);
+		outputBuffer = new InternalNioOutputBuffer(this, response, headerBufferSize, endpoint);
 		response.setOutputBuffer(outputBuffer);
 		request.setResponse(response);
 		sslEnabled = endpoint.getSSLEnabled();
@@ -138,6 +138,13 @@
 		return inputBuffer.available();
 	}
 
+    /**
+     * @return true if the processor is not doing anything
+     */
+    public boolean isProcessing() {
+        return processing;
+    }
+
 	/**
 	 * Add input or output filter.
 	 * 

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java	2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java	2013-10-11 16:18:21 UTC (rev 2277)
@@ -863,12 +863,15 @@
 							}
 						}
 					} else {
-						if (proto.endpoint.isRunning()) {
-							proto.endpoint.addEventChannel(channel, processor.getTimeout(),
-									processor.getReadNotifications(),
-									processor.getWriteNotification(),
-									processor.getResumeNotification(), false);
-						}
+					    if (processor.isAvailable() && processor.getReadNotifications()) {
+					        // Call a read event right away
+					        state = event(channel, SocketStatus.OPEN_READ);
+					    } else if (proto.endpoint.isRunning()) {
+					        proto.endpoint.addEventChannel(channel, processor.getTimeout(),
+					                processor.getReadNotifications(),
+					                processor.getWriteNotification(),
+					                processor.getResumeNotification(), false);
+					    }
 					}
 					processor.endProcessing();
 				}

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-11 16:18:21 UTC (rev 2277)
@@ -26,6 +26,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.coyote.InputBuffer;
@@ -69,21 +70,32 @@
 	 */
 	protected NioEndpoint endpoint = null;
 
+    /**
+     * NIO processor.
+     */
+    protected Http11NioProcessor processor;
+
 	/**
 	 * The completion handler used for asynchronous read operations
 	 */
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
 	/**
+	 * Latch to wait for non blocking operations to run the completion handler.
+	 */
+    private CountDownLatch latch = null;
+
+    /**
 	 * Create a new instance of {@code InternalNioInputBuffer}
 	 * 
 	 * @param request
 	 * @param headerBufferSize
 	 * @param endpoint
 	 */
-	public InternalNioInputBuffer(Request request, int headerBufferSize, NioEndpoint endpoint) {
+	public InternalNioInputBuffer(Http11NioProcessor processor, Request request, int headerBufferSize, NioEndpoint endpoint) {
 		super(request, headerBufferSize);
 		this.endpoint = endpoint;
+        this.processor = processor;
 		this.init();
 	}
 
@@ -100,23 +112,35 @@
 
 			@Override
 			public void completed(Integer nBytes, NioChannel attachment) {
-				if (nBytes < 0) {
-					failed(new ClosedChannelException(), attachment);
-					return;
-				}
+			    if (nBytes < 0) {
+			        failed(new ClosedChannelException(), attachment);
+			        return;
+			    }
 
-				if (nBytes > 0) {
-					bbuf.flip();
-					bbuf.get(buf, pos, nBytes);
-					lastValid = pos + nBytes;
-					endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
-				}
+			    try {
+			        if (nBytes > 0) {
+			            bbuf.flip();
+			            bbuf.get(buf, pos, nBytes);
+			            lastValid = pos + nBytes;
+			            if (!processor.isProcessing() && processor.getReadNotifications()) {
+			                endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
+			            }
+			        }
+			    } finally {
+			        latch.countDown();
+			    }
 			}
 
 			@Override
 			public void failed(Throwable exc, NioChannel attachment) {
-				endpoint.removeEventChannel(attachment);
-				endpoint.processChannel(attachment, SocketStatus.ERROR);
+                try {
+                    endpoint.removeEventChannel(attachment);
+                    if (!processor.isProcessing()) {
+                        endpoint.processChannel(attachment, SocketStatus.ERROR);
+                    }
+                } finally {
+                    latch.countDown();
+                }
 			}
 		};
 	}
@@ -391,37 +415,42 @@
 	 * @see org.apache.coyote.http11.AbstractInternalInputBuffer#fill()
 	 */
 	protected boolean fill() throws IOException {
-		int nRead = 0;
-		// Prepare the internal input buffer for reading
-		this.prepare();
-		// Reading from client
-		if (nonBlocking) {
-			nonBlockingRead(bbuf, readTimeout, unit);
-		} else {
-			nRead = blockingRead(bbuf, readTimeout, unit);
-			if (nRead > 0) {
-				bbuf.flip();
-				if (nRead > (buf.length - end)) {
-				    // An alternative is to bbuf.setLimit(buf.length - end) before the read,
-				    // which may be less efficient
-	                buf = new byte[buf.length];
-	                end = 0;
-	                pos = end;
-	                lastValid = pos;
-				}
-				bbuf.get(buf, pos, nRead);
-				lastValid = pos + nRead;
-			} else if (nRead == NioChannel.OP_STATUS_CLOSED) {
-				throw new IOException(MESSAGES.failedRead());
-			} else if (nRead == NioChannel.OP_STATUS_READ_TIMEOUT) {
-				throw new SocketTimeoutException(MESSAGES.failedRead());
-			}
-		}
-
-		return (nRead >= 0);
+		return (fill0() >= 0);
 	}
 
-	/**
+    private int fill0() throws IOException {
+        int nRead = 0;
+        // Prepare the internal input buffer for reading
+        this.prepare();
+        // Reading from client
+        if (nonBlocking) {
+            nonBlockingRead(bbuf, readTimeout, unit);
+            nRead = lastValid - pos;
+        } else {
+            nRead = blockingRead(bbuf, readTimeout, unit);
+            if (nRead > 0) {
+                bbuf.flip();
+                if (nRead > (buf.length - end)) {
+                    // An alternative is to bbuf.setLimit(buf.length - end) before the read,
+                    // which may be less efficient
+                    buf = new byte[buf.length];
+                    end = 0;
+                    pos = end;
+                    lastValid = pos;
+                }
+                bbuf.get(buf, pos, nRead);
+                lastValid = pos + nRead;
+            } else if (nRead == NioChannel.OP_STATUS_CLOSED) {
+                throw new EOFException(MESSAGES.failedRead());
+            } else if (nRead == NioChannel.OP_STATUS_READ_TIMEOUT) {
+                throw new SocketTimeoutException(MESSAGES.failedRead());
+            }
+        }
+
+        return nRead;
+    }
+
+    /**
 	 * Prepare the input buffer for reading
 	 */
 	private void prepare() {
@@ -454,7 +483,9 @@
 	private void nonBlockingRead(final ByteBuffer bb, long timeout, TimeUnit unit) {
 		final NioChannel ch = this.channel;
 		try {
+		    latch = new CountDownLatch(1);
 			ch.read(bb, ch, this.completionHandler);
+			latch.await();
 		} catch (Throwable t) {
 			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
 			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(t);
@@ -463,14 +494,6 @@
 	}
 
 	/**
-	 * 
-	 */
-	protected void readAsync() throws IOException {
-		this.prepare();
-		this.nonBlockingRead(bbuf, readTimeout, unit);
-	}
-
-	/**
 	 * Read a sequence of bytes in blocking mode from he current channel
 	 * 
 	 * @param bb
@@ -505,8 +528,11 @@
 		public int doRead(ByteChunk chunk, Request req) throws IOException {
 
 			if (pos >= lastValid) {
-				if (!fill()) {
+			    int nRead = fill0();
+				if (nRead < 0) {
 					return -1;
+				} else if (nRead == 0) {
+				    return 0;
 				}
 			}
 

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-11 16:18:21 UTC (rev 2277)
@@ -24,6 +24,7 @@
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.WritePendingException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.coyote.ActionCode;
@@ -31,6 +32,7 @@
 import org.apache.tomcat.util.buf.ByteChunk;
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
 import org.jboss.web.CoyoteLogger;
 
 /**
@@ -52,11 +54,21 @@
 	 */
 	protected NioEndpoint endpoint;
 
+    /**
+     * NIO processor.
+     */
+    protected Http11NioProcessor processor;
+
 	/**
 	 * The completion handler used for asynchronous write operations
 	 */
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
+    /**
+     * Latch to wait for non blocking operations to run the completion handler.
+     */
+    private CountDownLatch latch = null;
+
 	/**
 	 * Create a new instance of {@code InternalNioOutputBuffer}
 	 * 
@@ -64,9 +76,10 @@
 	 * @param headerBufferSize
 	 * @param endpoint
 	 */
-	public InternalNioOutputBuffer(Response response, int headerBufferSize, NioEndpoint endpoint) {
+	public InternalNioOutputBuffer(Http11NioProcessor processor, Response response, int headerBufferSize, NioEndpoint endpoint) {
 		super(response, headerBufferSize);
 		this.endpoint = endpoint;
+		this.processor = processor;
 		// Initialize the input buffer
 		this.init();
 	}
@@ -85,29 +98,37 @@
 
 			@Override
 			public void completed(Integer nBytes, NioChannel attachment) {
-				if (nBytes < 0) {
-					failed(new ClosedChannelException(), attachment);
-					return;
-				} else {
-                    response.setLastWrite(nBytes);
-				}
+                try {
+                    if (nBytes < 0) {
+                        failed(new ClosedChannelException(), attachment);
+                        return;
+                    }
 
-				if (bbuf.hasRemaining()) {
-				    try {
-				        attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
-				    } catch (WritePendingException e) {
-				        response.setLastWrite(0);
-				    }
-				} else {
-					// Clear the buffer when all bytes are written
-					clearBuffer();
-				}
+                    if (bbuf.hasRemaining()) {
+                        try {
+                            attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
+                        } catch (WritePendingException e) {
+                            response.setLastWrite(0);
+                        }
+                    } else {
+                        // Clear the buffer when all bytes are written
+                        clearBuffer();
+                        response.setLastWrite(nBytes);
+                        if (!processor.isProcessing() && processor.getWriteNotification()) {
+                            endpoint.processChannel(attachment, SocketStatus.OPEN_WRITE);
+                        }
+                    }
+                } finally {
+                    latch.countDown();
+                }
 			}
 
 			@Override
 			public void failed(Throwable exc, NioChannel attachment) {
 				endpoint.removeEventChannel(attachment);
-				// endpoint.processChannel(attachment, SocketStatus.ERROR);
+                if (!processor.isProcessing()) {
+                    endpoint.processChannel(attachment, SocketStatus.ERROR);
+                }
 			}
 		};
 	}
@@ -188,8 +209,10 @@
 	 */
 	private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
 		try {
+		    latch = new CountDownLatch(1);
 			// Perform the write operation
 			this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
+			latch.await();
 		} catch (Throwable t) {
 			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
 			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
@@ -334,6 +357,9 @@
 	 */
 	@Override
 	public boolean flushLeftover() throws IOException {
+	    if (leftover.getLength() == 0) {
+	        return true;
+	    }
 		// Calculate the number of bytes that fit in the buffer
 		int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
 		// put bytes in the buffer



More information about the jbossweb-commits mailing list