[jbossweb-commits] JBossWeb SVN: r2277 - branches/7.4.x/src/main/java/org/apache/coyote/http11.
jbossweb-commits at lists.jboss.org
jbossweb-commits at lists.jboss.org
Fri Oct 11 12:18:21 EDT 2013
Author: remy.maucherat at jboss.com
Date: 2013-10-11 12:18:21 -0400 (Fri, 11 Oct 2013)
New Revision: 2277
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
Fix some input and output corruption when using non blocking, hopefully without causing regressions.
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2013-10-11 16:18:21 UTC (rev 2277)
@@ -96,7 +96,7 @@
this.endpoint = endpoint;
request = new Request();
- inputBuffer = new InternalNioInputBuffer(request, headerBufferSize, endpoint);
+ inputBuffer = new InternalNioInputBuffer(this, request, headerBufferSize, endpoint);
request.setInputBuffer(inputBuffer);
if (endpoint.getUseSendfile()) {
request.setSendfile(true);
@@ -104,7 +104,7 @@
response = new Response();
response.setHook(this);
- outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize, endpoint);
+ outputBuffer = new InternalNioOutputBuffer(this, response, headerBufferSize, endpoint);
response.setOutputBuffer(outputBuffer);
request.setResponse(response);
sslEnabled = endpoint.getSSLEnabled();
@@ -138,6 +138,13 @@
return inputBuffer.available();
}
+ /**
+ * @return true if the processor is not doing anything
+ */
+ public boolean isProcessing() {
+ return processing;
+ }
+
/**
* Add input or output filter.
*
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java 2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProtocol.java 2013-10-11 16:18:21 UTC (rev 2277)
@@ -863,12 +863,15 @@
}
}
} else {
- if (proto.endpoint.isRunning()) {
- proto.endpoint.addEventChannel(channel, processor.getTimeout(),
- processor.getReadNotifications(),
- processor.getWriteNotification(),
- processor.getResumeNotification(), false);
- }
+ if (processor.isAvailable() && processor.getReadNotifications()) {
+ // Call a read event right away
+ state = event(channel, SocketStatus.OPEN_READ);
+ } else if (proto.endpoint.isRunning()) {
+ proto.endpoint.addEventChannel(channel, processor.getTimeout(),
+ processor.getReadNotifications(),
+ processor.getWriteNotification(),
+ processor.getResumeNotification(), false);
+ }
}
processor.endProcessing();
}
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-11 16:18:21 UTC (rev 2277)
@@ -26,6 +26,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.coyote.InputBuffer;
@@ -69,21 +70,32 @@
*/
protected NioEndpoint endpoint = null;
+ /**
+ * NIO processor.
+ */
+ protected Http11NioProcessor processor;
+
/**
* The completion handler used for asynchronous read operations
*/
private CompletionHandler<Integer, NioChannel> completionHandler;
/**
+ * Latch to wait for non blocking operations to run the completion handler.
+ */
+ private CountDownLatch latch = null;
+
+ /**
* Create a new instance of {@code InternalNioInputBuffer}
*
* @param request
* @param headerBufferSize
* @param endpoint
*/
- public InternalNioInputBuffer(Request request, int headerBufferSize, NioEndpoint endpoint) {
+ public InternalNioInputBuffer(Http11NioProcessor processor, Request request, int headerBufferSize, NioEndpoint endpoint) {
super(request, headerBufferSize);
this.endpoint = endpoint;
+ this.processor = processor;
this.init();
}
@@ -100,23 +112,35 @@
@Override
public void completed(Integer nBytes, NioChannel attachment) {
- if (nBytes < 0) {
- failed(new ClosedChannelException(), attachment);
- return;
- }
+ if (nBytes < 0) {
+ failed(new ClosedChannelException(), attachment);
+ return;
+ }
- if (nBytes > 0) {
- bbuf.flip();
- bbuf.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
- }
+ try {
+ if (nBytes > 0) {
+ bbuf.flip();
+ bbuf.get(buf, pos, nBytes);
+ lastValid = pos + nBytes;
+ if (!processor.isProcessing() && processor.getReadNotifications()) {
+ endpoint.processChannel(attachment, SocketStatus.OPEN_READ);
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
}
@Override
public void failed(Throwable exc, NioChannel attachment) {
- endpoint.removeEventChannel(attachment);
- endpoint.processChannel(attachment, SocketStatus.ERROR);
+ try {
+ endpoint.removeEventChannel(attachment);
+ if (!processor.isProcessing()) {
+ endpoint.processChannel(attachment, SocketStatus.ERROR);
+ }
+ } finally {
+ latch.countDown();
+ }
}
};
}
@@ -391,37 +415,42 @@
* @see org.apache.coyote.http11.AbstractInternalInputBuffer#fill()
*/
protected boolean fill() throws IOException {
- int nRead = 0;
- // Prepare the internal input buffer for reading
- this.prepare();
- // Reading from client
- if (nonBlocking) {
- nonBlockingRead(bbuf, readTimeout, unit);
- } else {
- nRead = blockingRead(bbuf, readTimeout, unit);
- if (nRead > 0) {
- bbuf.flip();
- if (nRead > (buf.length - end)) {
- // An alternative is to bbuf.setLimit(buf.length - end) before the read,
- // which may be less efficient
- buf = new byte[buf.length];
- end = 0;
- pos = end;
- lastValid = pos;
- }
- bbuf.get(buf, pos, nRead);
- lastValid = pos + nRead;
- } else if (nRead == NioChannel.OP_STATUS_CLOSED) {
- throw new IOException(MESSAGES.failedRead());
- } else if (nRead == NioChannel.OP_STATUS_READ_TIMEOUT) {
- throw new SocketTimeoutException(MESSAGES.failedRead());
- }
- }
-
- return (nRead >= 0);
+ return (fill0() >= 0);
}
- /**
+ private int fill0() throws IOException {
+ int nRead = 0;
+ // Prepare the internal input buffer for reading
+ this.prepare();
+ // Reading from client
+ if (nonBlocking) {
+ nonBlockingRead(bbuf, readTimeout, unit);
+ nRead = lastValid - pos;
+ } else {
+ nRead = blockingRead(bbuf, readTimeout, unit);
+ if (nRead > 0) {
+ bbuf.flip();
+ if (nRead > (buf.length - end)) {
+ // An alternative is to bbuf.setLimit(buf.length - end) before the read,
+ // which may be less efficient
+ buf = new byte[buf.length];
+ end = 0;
+ pos = end;
+ lastValid = pos;
+ }
+ bbuf.get(buf, pos, nRead);
+ lastValid = pos + nRead;
+ } else if (nRead == NioChannel.OP_STATUS_CLOSED) {
+ throw new EOFException(MESSAGES.failedRead());
+ } else if (nRead == NioChannel.OP_STATUS_READ_TIMEOUT) {
+ throw new SocketTimeoutException(MESSAGES.failedRead());
+ }
+ }
+
+ return nRead;
+ }
+
+ /**
* Prepare the input buffer for reading
*/
private void prepare() {
@@ -454,7 +483,9 @@
private void nonBlockingRead(final ByteBuffer bb, long timeout, TimeUnit unit) {
final NioChannel ch = this.channel;
try {
+ latch = new CountDownLatch(1);
ch.read(bb, ch, this.completionHandler);
+ latch.await();
} catch (Throwable t) {
if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(t);
@@ -463,14 +494,6 @@
}
/**
- *
- */
- protected void readAsync() throws IOException {
- this.prepare();
- this.nonBlockingRead(bbuf, readTimeout, unit);
- }
-
- /**
* Read a sequence of bytes in blocking mode from he current channel
*
* @param bb
@@ -505,8 +528,11 @@
public int doRead(ByteChunk chunk, Request req) throws IOException {
if (pos >= lastValid) {
- if (!fill()) {
+ int nRead = fill0();
+ if (nRead < 0) {
return -1;
+ } else if (nRead == 0) {
+ return 0;
}
}
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-10 16:12:38 UTC (rev 2276)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-11 16:18:21 UTC (rev 2277)
@@ -24,6 +24,7 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
import java.nio.channels.WritePendingException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.coyote.ActionCode;
@@ -31,6 +32,7 @@
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
import org.jboss.web.CoyoteLogger;
/**
@@ -52,11 +54,21 @@
*/
protected NioEndpoint endpoint;
+ /**
+ * NIO processor.
+ */
+ protected Http11NioProcessor processor;
+
/**
* The completion handler used for asynchronous write operations
*/
private CompletionHandler<Integer, NioChannel> completionHandler;
+ /**
+ * Latch to wait for non blocking operations to run the completion handler.
+ */
+ private CountDownLatch latch = null;
+
/**
* Create a new instance of {@code InternalNioOutputBuffer}
*
@@ -64,9 +76,10 @@
* @param headerBufferSize
* @param endpoint
*/
- public InternalNioOutputBuffer(Response response, int headerBufferSize, NioEndpoint endpoint) {
+ public InternalNioOutputBuffer(Http11NioProcessor processor, Response response, int headerBufferSize, NioEndpoint endpoint) {
super(response, headerBufferSize);
this.endpoint = endpoint;
+ this.processor = processor;
// Initialize the input buffer
this.init();
}
@@ -85,29 +98,37 @@
@Override
public void completed(Integer nBytes, NioChannel attachment) {
- if (nBytes < 0) {
- failed(new ClosedChannelException(), attachment);
- return;
- } else {
- response.setLastWrite(nBytes);
- }
+ try {
+ if (nBytes < 0) {
+ failed(new ClosedChannelException(), attachment);
+ return;
+ }
- if (bbuf.hasRemaining()) {
- try {
- attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
- } catch (WritePendingException e) {
- response.setLastWrite(0);
- }
- } else {
- // Clear the buffer when all bytes are written
- clearBuffer();
- }
+ if (bbuf.hasRemaining()) {
+ try {
+ attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment, this);
+ } catch (WritePendingException e) {
+ response.setLastWrite(0);
+ }
+ } else {
+ // Clear the buffer when all bytes are written
+ clearBuffer();
+ response.setLastWrite(nBytes);
+ if (!processor.isProcessing() && processor.getWriteNotification()) {
+ endpoint.processChannel(attachment, SocketStatus.OPEN_WRITE);
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
}
@Override
public void failed(Throwable exc, NioChannel attachment) {
endpoint.removeEventChannel(attachment);
- // endpoint.processChannel(attachment, SocketStatus.ERROR);
+ if (!processor.isProcessing()) {
+ endpoint.processChannel(attachment, SocketStatus.ERROR);
+ }
}
};
}
@@ -188,8 +209,10 @@
*/
private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
try {
+ latch = new CountDownLatch(1);
// Perform the write operation
this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
+ latch.await();
} catch (Throwable t) {
if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
@@ -334,6 +357,9 @@
*/
@Override
public boolean flushLeftover() throws IOException {
+ if (leftover.getLength() == 0) {
+ return true;
+ }
// Calculate the number of bytes that fit in the buffer
int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
// put bytes in the buffer
More information about the jbossweb-commits
mailing list