Author: remy.maucherat(a)jboss.com
Date: 2013-10-30 12:09:23 -0400 (Wed, 30 Oct 2013)
New Revision: 2299
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
- Port patch.
- Redo some threading, a number of operations need to be dispatched to other threads.
Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -31,6 +31,14 @@
public static final String ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM =
"org.apache.tomcat.websocket.noAddAfterHandshake";
+ // Executor configuration
+ public static final String EXECUTOR_CORE_SIZE_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorCoreSize";
+ public static final String EXECUTOR_MAX_SIZE_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorMaxSize";
+ public static final String EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM =
+ "org.apache.tomcat.websocket.executorKeepAliveTimeSeconds";
+
public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE =
"javax.websocket.server.ServerContainer";
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -45,6 +45,7 @@
ServletContext sc = sce.getServletContext();
Object obj =
sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
if (obj instanceof WsServerContainer) {
+ ((WsServerContainer) obj).shutdownExecutor();
((WsServerContainer) obj).destroy();
}
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -227,7 +227,9 @@
@Override
public void onWritePossible() {
- wsRemoteEndpointServer.onWritePossible();
+ // Triggered by the poller so this isn't the same thread that
+ // triggered the write so no need for a dispatch
+ wsRemoteEndpointServer.onWritePossible(false);
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -20,6 +20,9 @@
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
@@ -35,8 +38,12 @@
*/
public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
+ private static final Queue<OnResultRunnable> onResultRunnables =
+ new ConcurrentLinkedQueue<OnResultRunnable>();
+
private final AbstractServletOutputStream sos;
private final WsWriteTimeout wsWriteTimeout;
+ private final ExecutorService executorService;
private volatile SendHandler handler = null;
private volatile ByteBuffer[] buffers = null;
@@ -48,6 +55,7 @@
WsServerContainer serverContainer) {
this.sos = sos;
this.wsWriteTimeout = serverContainer.getTimeout();
+ this.executorService = serverContainer.getExecutorService();
}
@@ -61,7 +69,9 @@
protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
this.handler = handler;
this.buffers = buffers;
- onWritePossible();
+ // This is definitely the same thread that triggered the write so a
+ // dispatch will be required.
+ onWritePossible(true);
}
@@ -73,7 +83,7 @@
}
- public void onWritePossible() {
+ public void onWritePossible(boolean useDispatch) {
boolean complete = true;
try {
// If this is false there will be a call back when it is true
@@ -91,7 +101,7 @@
if (complete) {
timeoutExpiry = -1;
wsWriteTimeout.unregister(this);
- clearHandler(null);
+ clearHandler(null, useDispatch);
// Explicit flush for compatibility with buffered streams
sos.flush();
if (close) {
@@ -103,7 +113,7 @@
} catch (IOException ioe) {
wsWriteTimeout.unregister(this);
- clearHandler(ioe);
+ clearHandler(ioe, useDispatch);
close();
}
if (!complete) {
@@ -122,7 +132,11 @@
@Override
protected void doClose() {
if (handler != null) {
- clearHandler(new EOFException());
+ // close() can be triggered by a wide range of scenarios. It is far
+ // simpler just to always use a dispatch that it is to try and track
+ // whether or not this method was called by the same thread that
+ // triggered the write
+ clearHandler(new EOFException(), true);
}
try {
sos.close();
@@ -138,15 +152,31 @@
}
- protected void onTimeout() {
+ /*
+ * Currently this is only called from the background thread so we could just
+ * call clearHandler() with useDispatch == false but the method parameter
+ * was added in case other callers started to use this method to make sure
+ * that those callers think through what the correct value of useDispatch is
+ * for them.
+ */
+ protected void onTimeout(boolean useDispatch) {
if (handler != null) {
- clearHandler(new SocketTimeoutException());
+ clearHandler(new SocketTimeoutException(), useDispatch);
}
close();
}
- private void clearHandler(Throwable t) {
+ /**
+ *
+ * @param t The throwable associated with any error that
+ * occurred
+ * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be
+ * called from a new thread, keeping in mind the
+ * requirements of
+ * {@link javax.websocket.RemoteEndpoint.Async}
+ */
+ private void clearHandler(Throwable t, boolean useDispatch) {
// Setting the result marks this (partial) message as
// complete which means the next one may be sent which
// could update the value of the handler. Therefore, keep a
@@ -155,11 +185,64 @@
SendHandler sh = handler;
handler = null;
if (sh != null) {
+ if (useDispatch) {
+ OnResultRunnable r = onResultRunnables.poll();
+ if (r == null) {
+ r = new OnResultRunnable(onResultRunnables);
+ }
+ r.init(sh, t);
+ if (executorService == null || executorService.isShutdown()) {
+ // Can't use the executor so call the runnable directly.
+ // This may not be strictly specification compliant in all
+ // cases but during shutdown only close messages are going
+ // to be sent so there should not be the issue of nested
+ // calls leading to stack overflow as described in bug
+ // 55715. The issues with nested calls was the reason for
+ // the separate thread requirement in the specification.
+ r.run();
+ } else {
+ executorService.execute(r);
+ }
+ } else {
+ if (t == null) {
+ sh.onResult(new SendResult());
+ } else {
+ sh.onResult(new SendResult(t));
+ }
+ }
+ }
+ }
+
+
+ private static class OnResultRunnable implements Runnable {
+
+ private final Queue<OnResultRunnable> queue;
+
+ private volatile SendHandler sh;
+ private volatile Throwable t;
+
+ private OnResultRunnable(Queue<OnResultRunnable> queue) {
+ this.queue = queue;
+ }
+
+ private void init(SendHandler sh, Throwable t) {
+ this.sh = sh;
+ this.t = t;
+ }
+
+ @Override
+ public void run() {
if (t == null) {
sh.onResult(new SendResult());
} else {
sh.onResult(new SendResult(t));
}
+ t = null;
+ sh = null;
+ // Return the Runnable to the queue when it has been finished with
+ // Note if this method takes an age to finish there shouldn't be any
+ // thread safety issues as the fields are cleared above.
+ queue.add(this);
}
}
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -28,6 +28,12 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.DispatcherType;
import javax.servlet.FilterRegistration;
@@ -64,9 +70,7 @@
implements ServerContainer {
private static final CloseReason AUTHENTICATED_HTTP_SESSION_CLOSED =
- new CloseReason(CloseCodes.VIOLATED_POLICY,
- "This connection was established under an authenticated "
+
- "HTTP session that has ended.");
+ new CloseReason(CloseCodes.VIOLATED_POLICY, MESSAGES.expiredHttpSession());
private final WsWriteTimeout wsWriteTimeout = new WsWriteTimeout();
@@ -80,6 +84,7 @@
private volatile boolean addAllowed = true;
private final ConcurrentHashMap<String,Set<WsSession>>
authenticatedSessions =
new ConcurrentHashMap<String, Set<WsSession>>();
+ private final ExecutorService executorService;
WsServerContainer(ServletContext servletContext) {
@@ -103,6 +108,25 @@
if (value != null) {
setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
}
+ // Executor config
+ int executorCoreSize = 0;
+ int executorMaxSize = 10;
+ long executorKeepAliveTimeSeconds = 60;
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorCoreSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorMaxSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
+ if (value != null) {
+ executorKeepAliveTimeSeconds = Long.parseLong(value);
+ }
FilterRegistration.Dynamic fr = servletContext.addFilter(
WsFilter.class.getName(), new WsFilter());
@@ -112,6 +136,22 @@
DispatcherType.FORWARD);
fr.addMappingForUrlPatterns(types, true, "/*");
+
+ // Use a per web application executor for any threads the the WebSocket
+ // server code needs to create. Group all of the threads under a single
+ // ThreadGroup.
+ StringBuffer threadGroupName = new StringBuffer("WebSocketServer-");
+ if ("".equals(servletContext.getContextPath())) {
+ threadGroupName.append("ROOT");
+ } else {
+ threadGroupName.append(servletContext.getContextPath());
+ }
+ ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+ WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
+
+ executorService = new ThreadPoolExecutor(executorCoreSize,
+ executorMaxSize, executorKeepAliveTimeSeconds, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
}
@@ -372,6 +412,21 @@
}
}
+
+ ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+
+ void shutdownExecutor() {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ // Ignore the interruption and carry on
+ }
+ }
+
private static void validateEncoders(Class<? extends Encoder>[] encoders)
throws DeploymentException {
@@ -436,4 +491,21 @@
tpm2.getUriTemplate().getNormalizedPath());
}
}
+
+ private static class WsThreadFactory implements ThreadFactory {
+
+ private final ThreadGroup tg;
+ private final AtomicLong count = new AtomicLong(0);
+
+ private WsThreadFactory(ThreadGroup tg) {
+ this.tg = tg;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(tg, r);
+ t.setName(tg.getName() + "-" + count.incrementAndGet());
+ return t;
+ }
+ }
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java 2013-10-29
18:33:11 UTC (rev 2298)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java 2013-10-30
16:09:23 UTC (rev 2299)
@@ -52,7 +52,9 @@
while (iter.hasNext()) {
WsRemoteEndpointImplServer endpoint = iter.next();
if (endpoint.getTimeoutExpiry() < now) {
- endpoint.onTimeout();
+ // Background thread, not the thread that triggered the
+ // write so no need to use a dispatch
+ endpoint.onTimeout(false);
} else {
// Endpoints are ordered by timeout expiry so if this point
// is reached there is no need to check the remaining
Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-10-29 18:33:11
UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-10-30 16:09:23
UTC (rev 2299)
@@ -296,4 +296,7 @@
@Message(id = 8586, value = "Upgrade failed")
String upgradeFailed();
+ @Message(id = 8587, value = "This connection was established under an
authenticated HTTP session that has ended")
+ String expiredHttpSession();
+
}