Author: remy.maucherat(a)jboss.com
Date: 2014-11-17 06:43:26 -0500 (Mon, 17 Nov 2014)
New Revision: 2541
Modified:
branches/7.5.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
Log:
Change websocket output to skip buffering and add some sync (input is synced upstream, but
not output, which is weird).
Modified: branches/7.5.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java 2014-11-14
11:10:55 UTC (rev 2540)
+++
branches/7.5.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java 2014-11-17
11:43:26 UTC (rev 2541)
@@ -336,6 +336,9 @@
*/
public void flush()
throws IOException {
+ if (response.getRequest().getUpgradeHandler() != null) {
+ return;
+ }
doFlush(true);
}
@@ -446,15 +449,31 @@
if (closed)
return;
- bb.append(b, off, len);
- bytesWritten += len;
+ if (response.getRequest().getUpgradeHandler() != null) {
+ // If we really have something to write
+ if (len > 0) {
+ // real write to the adapter
+ ByteChunk output = new ByteChunk();
+ output.setBytes(b, off, len);
+ try {
+ coyoteResponse.doWrite(output);
+ } catch (IOException e) {
+ // An IOException on a write is almost always due to
+ // the remote client aborting the request. Wrap this
+ // so that it can be handled better by the error dispatcher.
+ throw new ClientAbortException(e);
+ }
+ }
+ } else {
+ bb.append(b, off, len);
+ bytesWritten += len;
- // if called from within flush(), then immediately flush
- // remaining bytes
- if (doFlush) {
- bb.flushBuffer();
+ // if called from within flush(), then immediately flush
+ // remaining bytes
+ if (doFlush) {
+ bb.flushBuffer();
+ }
}
-
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2014-11-14
11:10:55 UTC (rev 2540)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2014-11-17
11:43:26 UTC (rev 2541)
@@ -47,6 +47,7 @@
private final ExecutorService executorService;
private volatile SendHandler handler = null;
private volatile ByteBuffer[] buffers = null;
+ private final Object connectionWriteLock = new Object();
private volatile long timeoutExpiry = -1;
private volatile boolean close;
@@ -77,53 +78,54 @@
public void onWritePossible(boolean useDispatch) {
- if (buffers == null) {
- // Servlet 3.1 will call the write listener once even if nothing
- // was written
- return;
- }
- boolean complete = true;
- try {
- // If this is false there will be a call back when it is true
- while (sos.isReady()) {
- complete = true;
- for (ByteBuffer buffer : buffers) {
- // FIXME: might not be needed
- synchronized (buffer) {
- if (buffer.hasRemaining()) {
- complete = false;
- sos.write(buffer.array(), buffer.arrayOffset(),
- buffer.limit());
- buffer.position(buffer.limit());
- break;
+ synchronized (connectionWriteLock) {
+ if (buffers == null) {
+ // Servlet 3.1 will call the write listener once even if nothing
+ // was written
+ return;
+ }
+ boolean complete = true;
+ try {
+ // If this is false there will be a call back when it is true
+ while (sos.isReady()) {
+ complete = true;
+ for (ByteBuffer buffer : buffers) {
+ synchronized (buffer) {
+ if (buffer.hasRemaining()) {
+ complete = false;
+ sos.write(buffer.array(), buffer.arrayOffset(),
+ buffer.limit());
+ buffer.position(buffer.limit());
+ break;
+ }
}
}
- }
- if (complete) {
- wsWriteTimeout.unregister(this);
- clearHandler(null, useDispatch);
- // Explicit flush for compatibility with buffered streams
- sos.flush();
- if (close) {
- close();
+ if (complete) {
+ wsWriteTimeout.unregister(this);
+ clearHandler(null, useDispatch);
+ // Explicit flush for compatibility with buffered streams
+ sos.flush();
+ if (close) {
+ close();
+ }
+ break;
}
- break;
}
+
+ } catch (IOException ioe) {
+ wsWriteTimeout.unregister(this);
+ clearHandler(ioe, useDispatch);
+ close();
}
+ if (!complete) {
+ // Async write is in progress
- } catch (IOException ioe) {
- wsWriteTimeout.unregister(this);
- clearHandler(ioe, useDispatch);
- close();
- }
- if (!complete) {
- // Async write is in progress
-
- long timeout = getSendTimeout();
- if (timeout > 0) {
- // Register with timeout thread
- timeoutExpiry = timeout + System.currentTimeMillis();
- wsWriteTimeout.register(this);
+ long timeout = getSendTimeout();
+ if (timeout > 0) {
+ // Register with timeout thread
+ timeoutExpiry = timeout + System.currentTimeMillis();
+ wsWriteTimeout.register(this);
+ }
}
}
}