Author: remy.maucherat(a)jboss.com
Date: 2014-11-13 12:57:45 -0500 (Thu, 13 Nov 2014)
New Revision: 2538
Modified:
branches/7.5.x/src/main/java/org/apache/catalina/connector/InputBuffer.java
branches/7.5.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
branches/7.5.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
Log:
- Harmonize read with write, it's simpler and passes all the tests.
Modified: branches/7.5.x/src/main/java/org/apache/catalina/connector/InputBuffer.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/catalina/connector/InputBuffer.java 2014-11-12
15:43:14 UTC (rev 2537)
+++ branches/7.5.x/src/main/java/org/apache/catalina/connector/InputBuffer.java 2014-11-13
17:57:45 UTC (rev 2538)
@@ -610,7 +610,8 @@
throw MESSAGES.cannotSetListenerWithoutUpgradeOrAsync();
}
this.readListener = readListener;
- coyoteRequest.action(ActionCode.ACTION_EVENT_READ_BEGIN, null);
+ coyoteRequest.action(ActionCode.ACTION_EVENT_READ_BEGIN,
+ (request.getUpgradeHandler() != null) ? readListener : null);
}
}
Modified: branches/7.5.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2014-11-12
15:43:14 UTC (rev 2537)
+++
branches/7.5.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2014-11-13
17:57:45 UTC (rev 2538)
@@ -38,6 +38,7 @@
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
+import org.apache.coyote.http11.upgrade.servlet31.ReadListener;
import org.apache.coyote.http11.upgrade.servlet31.WriteListener;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
@@ -861,6 +862,7 @@
timeoutEvent(param);
} else if (actionCode == ActionCode.ACTION_EVENT_READ_BEGIN) {
inputBuffer.setNonBlocking(true);
+ inputBuffer.setReadListener((ReadListener) param);
readNotifications = true;
} else if (actionCode == ActionCode.ACTION_EVENT_WRITE_BEGIN) {
outputBuffer.setNonBlocking(true);
Modified:
branches/7.5.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2014-11-12
15:43:14 UTC (rev 2537)
+++
branches/7.5.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2014-11-13
17:57:45 UTC (rev 2538)
@@ -30,6 +30,7 @@
import org.apache.coyote.InputBuffer;
import org.apache.coyote.Request;
+import org.apache.coyote.http11.upgrade.servlet31.ReadListener;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
@@ -85,6 +86,11 @@
private Semaphore semaphore = new Semaphore(1);
/**
+ * Associated read listener for upgrade mode.
+ */
+ private ReadListener listener = null;
+
+ /**
* Create a new instance of {@code InternalNioInputBuffer}
*
* @param request
@@ -110,33 +116,59 @@
this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
@Override
- public synchronized void completed(Integer nBytes, NioChannel attachment) {
+ public void completed(Integer nBytes, NioChannel attachment) {
if (nBytes < 0) {
failed(new ClosedChannelException(), attachment);
return;
}
+ boolean notify = false;
- if (nBytes > 0) {
- bbuf.flip();
- if (nBytes > (buf.length - end)) {
- // An alternative is to bbuf.limit(buf.length - end) before the
read,
- // which may be less efficient
- buf = new byte[buf.length];
- end = 0;
- pos = end;
- lastValid = pos;
- }
- bbuf.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- semaphore.release();
- if (/*!processor.isProcessing() && */processor.getReadNotifications()
- && available) {
- available = false;
- if (!endpoint.processChannel(attachment, SocketStatus.OPEN_READ)) {
- endpoint.closeChannel(attachment);
+ synchronized (completionHandler) {
+ if (nBytes > 0) {
+ bbuf.flip();
+ if (nBytes > (buf.length - end)) {
+ // An alternative is to bbuf.limit(buf.length - end) before the read,
+ // which may be less efficient
+ buf = new byte[buf.length];
+ end = 0;
+ pos = end;
+ lastValid = pos;
}
+ bbuf.get(buf, pos, nBytes);
+ lastValid = pos + nBytes;
+ semaphore.release();
+ if (/*!processor.isProcessing() &&
*/processor.getReadNotifications()
+ && available) {
+ available = false;
+ notify = true;
+ }
}
}
+
+ if (notify) {
+ if (listener == null) {
+ if (!endpoint.processChannel(attachment, SocketStatus.OPEN_READ))
{
+ endpoint.closeChannel(attachment);
+ }
+ } else {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader =
thread.getContextClassLoader();
+ try {
+
thread.setContextClassLoader(listener.getClass().getClassLoader());
+ synchronized (channel.getLock()) {
+ listener.onDataAvailable();
+ }
+ } catch (Exception e) {
+ processor.getResponse().setErrorException(e);
+ endpoint.removeEventChannel(attachment);
+ if (!endpoint.processChannel(attachment, SocketStatus.ERROR))
{
+ endpoint.closeChannel(attachment);
+ }
+ } finally {
+ thread.setContextClassLoader(originalClassLoader);
+ }
+ }
+ }
}
@Override
@@ -187,6 +219,13 @@
return nonBlocking;
}
+ /**
+ * Set the associated read listener for upgrade mode.
+ */
+ public void setReadListener(ReadListener listener) {
+ this.listener = listener;
+ }
+
/*
* (non-Javadoc)
*
@@ -195,6 +234,7 @@
public void recycle() {
super.recycle();
bbuf.clear();
+ listener = null;
channel = null;
available = true;
readTimeout = (endpoint.getSoTimeout() > 0 ? endpoint.getSoTimeout()
Show replies by date