Author: remy.maucherat(a)jboss.com
Date: 2013-09-27 10:01:51 -0400 (Fri, 27 Sep 2013)
New Revision: 2265
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11AprProcessor.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/FutureToSendHandler.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
Log:
- Add flushing at the end of a message write (Tomcat uses unbuffered stream and direct
socket writes)
- Try using autoblocking for the blocking get() on the future
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11AprProcessor.java 2013-09-24
17:35:22 UTC (rev 2264)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11AprProcessor.java 2013-09-27
14:01:51 UTC (rev 2265)
@@ -96,6 +96,8 @@
initializeFilters();
+ Http11AbstractProcessor.containerThread.set(Boolean.FALSE);
+
// Cause loading of HexUtils
int foo = HexUtils.DEC[0];
@@ -109,12 +111,6 @@
/**
- * Thread local marker.
- */
- public static ThreadLocal<Boolean> containerThread = new
ThreadLocal<Boolean>();
-
-
- /**
* Associated adapter.
*/
protected Adapter adapter = null;
@@ -762,7 +758,7 @@
// Set error flag right away
error = true;
}
- containerThread.set(Boolean.TRUE);
+ Http11AbstractProcessor.containerThread.set(Boolean.TRUE);
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
error = !adapter.event(request, response, status);
} catch (InterruptedIOException e) {
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2013-09-24
17:35:22 UTC (rev 2264)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2013-09-27
14:01:51 UTC (rev 2265)
@@ -538,7 +538,7 @@
// If non blocking (event) and there are leftover bytes,
// and lastWrite was 0 -> error
- if (leftover.getLength() > 0 &&
!(Http11AprProcessor.containerThread.get() == Boolean.TRUE)) {
+ if (leftover.getLength() > 0 &&
!(Http11AbstractProcessor.containerThread.get() == Boolean.TRUE)) {
throw new IOException(MESSAGES.invalidBacklog());
}
@@ -789,7 +789,7 @@
// - If the call is asynchronous, throw an exception
// - If the call is synchronous, make regular blocking writes to flush the data
if (leftover.getLength() > 0) {
- if (Http11AprProcessor.containerThread.get() == Boolean.TRUE) {
+ if (Http11AbstractProcessor.containerThread.get() == Boolean.TRUE) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
// Send leftover bytes
res = Socket.send(socket, leftover.getBuffer(), leftover.getOffset(),
leftover.getEnd());
@@ -820,7 +820,7 @@
}
}
if (pos < end) {
- if (response.getFlushLeftovers() &&
(Http11AprProcessor.containerThread.get() == Boolean.TRUE)) {
+ if (response.getFlushLeftovers() &&
(Http11AbstractProcessor.containerThread.get() == Boolean.TRUE)) {
// Switch to blocking mode and write the data
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
res = Socket.sendbb(socket, 0, end);
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/FutureToSendHandler.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/FutureToSendHandler.java 2013-09-24
17:35:22 UTC (rev 2264)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/FutureToSendHandler.java 2013-09-27
14:01:51 UTC (rev 2265)
@@ -16,6 +16,7 @@
*/
package org.apache.tomcat.websocket;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -25,6 +26,8 @@
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
+import org.apache.coyote.http11.Http11AbstractProcessor;
+
/**
* Converts a Future to a SendHandler.
*/
@@ -71,15 +74,26 @@
@Override
public Void get() throws InterruptedException,
ExecutionException {
- try {
- wsSession.registerFuture(this);
- latch.await();
- } finally {
- wsSession.unregisterFuture(this);
+ // If inside a container thread, must use an autoblocking flush as the write
+ // event will never come to the Servlet layer until the container thread returns
+ if (Http11AbstractProcessor.containerThread.get() == Boolean.TRUE) {
+ // FIXME: this uses the IO timeout rather than no timeout as per the API
contract
+ try {
+ wsSession.forceFlush();
+ } catch (IOException e) {
+ throw new ExecutionException(e);
+ }
+ } else {
+ try {
+ wsSession.registerFuture(this);
+ latch.await();
+ } finally {
+ wsSession.unregisterFuture(this);
+ }
+ if (result.getException() != null) {
+ throw new ExecutionException(result.getException());
+ }
}
- if (result.getException() != null) {
- throw new ExecutionException(result.getException());
- }
return null;
}
@@ -87,20 +101,31 @@
public Void get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
TimeoutException {
- boolean retval = false;
- try {
- wsSession.registerFuture(this);
- retval = latch.await(timeout, unit);
- } finally {
- wsSession.unregisterFuture(this);
+ // If inside a container thread, must use an autoblocking flush as the write
+ // event will never come to the Servlet layer until the container thread returns
+ if (Http11AbstractProcessor.containerThread.get() == Boolean.TRUE) {
+ // FIXME: this uses the IO timeout rather than the timeout specified by the
user
+ try {
+ wsSession.forceFlush();
+ } catch (IOException e) {
+ throw new ExecutionException(e);
+ }
+ } else {
+ boolean retval = false;
+ try {
+ wsSession.registerFuture(this);
+ retval = latch.await(timeout, unit);
+ } finally {
+ wsSession.unregisterFuture(this);
+ }
+ if (retval == false) {
+ throw new TimeoutException();
+ }
+ if (result.getException() != null) {
+ throw new ExecutionException(result.getException());
+ }
}
- if (retval == false) {
- throw new TimeoutException();
- }
- if (result.getException() != null) {
- throw new ExecutionException(result.getException());
- }
return null;
}
}
Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2013-09-24
17:35:22 UTC (rev 2264)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2013-09-27
14:01:51 UTC (rev 2265)
@@ -436,6 +436,12 @@
}
}
+ /**
+ * Force an autoblocking flush.
+ */
+ public void forceFlush() throws IOException {
+ wsRemoteEndpoint.flushBatch();
+ }
private void fireEndpointOnClose(CloseReason closeReason) {
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2013-09-24
17:35:22 UTC (rev 2264)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2013-09-27
14:01:51 UTC (rev 2265)
@@ -82,9 +82,8 @@
}
if (complete) {
wsWriteTimeout.unregister(this);
- // ADD: Explicit flush
+ // Explicit flush for compatibility with buffered streams
sos.flush();
- // END ADD
if (close) {
close();
}