Author: remy.maucherat(a)jboss.com
Date: 2014-05-23 07:29:12 -0400 (Fri, 23 May 2014)
New Revision: 2414
Added:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoEndpointBase.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
branches/7.4.x/src/main/java/org/jboss/web/WebsocketsLogger.java
branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
Sync with Tomcat's websockets update: improve executor handling.
Added:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java
(rev 0)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelGroupUtil.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import static org.jboss.web.WebsocketsMessages.MESSAGES;
+
+import java.io.IOException;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
+
+/**
+ * This is a utility class that enables multiple {@link WsWebSocketContainer}
+ * instances to share a single {@link AsynchronousChannelGroup} while ensuring
+ * that the group is destroyed when no longer required.
+ */
+public class AsyncChannelGroupUtil {
+
+ private static AsynchronousChannelGroup group = null;
+ private static int usageCount = 0;
+ private static final Object lock = new Object();
+
+
+ private AsyncChannelGroupUtil() {
+ // Hide the default constructor
+ }
+
+
+ public static AsynchronousChannelGroup register() {
+ synchronized (lock) {
+ if (usageCount == 0) {
+ group = createAsynchronousChannelGroup();
+ }
+ usageCount++;
+ return group;
+ }
+ }
+
+
+ public static void unregister() {
+ synchronized (lock) {
+ usageCount--;
+ if (usageCount == 0) {
+ group.shutdown();
+ group = null;
+ }
+ }
+ }
+
+
+ private static AsynchronousChannelGroup createAsynchronousChannelGroup() {
+ // Need to do this with the right thread context class loader else the
+ // first web app to call this will trigger a leak
+ ClassLoader original = Thread.currentThread().getContextClassLoader();
+
+ try {
+ Thread.currentThread().setContextClassLoader(
+ AsyncIOThreadFactory.class.getClassLoader());
+
+ // These are the same settings as the default
+ // AsynchronousChannelGroup
+ int initialSize = Runtime.getRuntime().availableProcessors();
+ ExecutorService executorService = new ThreadPoolExecutor(
+ 0,
+ Integer.MAX_VALUE,
+ Long.MAX_VALUE, TimeUnit.MILLISECONDS,
+ new SynchronousQueue<Runnable>(),
+ new AsyncIOThreadFactory());
+
+ try {
+ return AsynchronousChannelGroup.withCachedThreadPool(
+ executorService, initialSize);
+ } catch (IOException e) {
+ // No good reason for this to happen.
+ throw MESSAGES.asyncGroupFail();
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(original);
+ }
+ }
+
+
+ private static class AsyncIOThreadFactory implements ThreadFactory {
+
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet());
+ t.setContextClassLoader(this.getClass().getClassLoader());
+ t.setDaemon(true);
+ return t;
+ }
+ }
+}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java 2014-05-22
09:02:55 UTC (rev 2413)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -28,9 +28,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@@ -52,7 +54,8 @@
private final ByteBuffer socketReadBuffer;
private final ByteBuffer socketWriteBuffer;
// One thread for read, one for write
- private final ExecutorService executor = Executors.newFixedThreadPool(2);
+ private final ExecutorService executor =
+ Executors.newFixedThreadPool(2, new SecureIOThreadFactory());
private AtomicBoolean writing = new AtomicBoolean(false);
private AtomicBoolean reading = new AtomicBoolean(false);
@@ -139,6 +142,7 @@
} catch (IOException e) {
WebsocketsLogger.ROOT_LOGGER.errorClose();
}
+ executor.shutdownNow();
}
@Override
@@ -538,4 +542,19 @@
return new Integer(result.intValue());
}
}
+
+
+ private static class SecureIOThreadFactory implements ThreadFactory {
+
+ private AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("WebSocketClient-SecureIO-" + count.incrementAndGet());
+ t.setContextClassLoader(this.getClass().getClassLoader());
+ t.setDaemon(true);
+ return t;
+ }
+ }
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 2014-05-22
09:02:55 UTC (rev 2413)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -44,13 +44,9 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -68,7 +64,6 @@
import javax.websocket.WebSocketContainer;
import org.apache.tomcat.util.codec.binary.Base64;
-import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.apache.tomcat.websocket.pojo.PojoEndpointClient;
import org.jboss.web.WebsocketsLogger;
@@ -107,43 +102,10 @@
private static final Random random = new Random();
private static final byte[] crlf = new byte[] {13, 10};
- private static final AsynchronousChannelGroup asynchronousChannelGroup;
- static {
- AsynchronousChannelGroup result = null;
+ private volatile AsynchronousChannelGroup asynchronousChannelGroup = null;
+ private final Object asynchronousChannelGroupLock = new Object();
- // Need to do this with the right thread context class loader else the
- // first web app to call this will trigger a leak
- ClassLoader original = Thread.currentThread().getContextClassLoader();
-
- try {
- Thread.currentThread().setContextClassLoader(
- AsyncIOThreadFactory.class.getClassLoader());
-
- // These are the same settings as the default
- // AsynchronousChannelGroup
- int initialSize = Runtime.getRuntime().availableProcessors();
- ExecutorService executorService = new ThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE,
- Long.MAX_VALUE, TimeUnit.MILLISECONDS,
- new SynchronousQueue<Runnable>(),
- new AsyncIOThreadFactory());
-
- try {
- result = AsynchronousChannelGroup.withCachedThreadPool(
- executorService, initialSize);
- } catch (IOException e) {
- // No good reason for this to happen.
- throw MESSAGES.asyncGroupFail();
- }
- } finally {
- Thread.currentThread().setContextClassLoader(original);
- }
-
- asynchronousChannelGroup = result;
- }
-
private final Map<Class<?>, Set<WsSession>> endpointSessionMap =
new HashMap<Class<?>, Set<WsSession>>();
private final Map<WsSession,WsSession> sessions = new
ConcurrentHashMap<WsSession, WsSession>();
@@ -186,8 +148,12 @@
}
}
- ClientEndpointConfig config = ClientEndpointConfig.Builder.create().
- configurator(configurator).
+ ClientEndpointConfig.Builder builder = ClientEndpointConfig.Builder.create();
+ // Avoid NPE when using RI API JAR - see BZ 56343
+ if (configurator != null) {
+ builder.configurator(configurator);
+ }
+ ClientEndpointConfig config = builder.
decoders(Arrays.asList(annotation.decoders())).
encoders(Arrays.asList(annotation.encoders())).
build();
@@ -274,8 +240,7 @@
AsynchronousSocketChannel socketChannel;
try {
- socketChannel =
- AsynchronousSocketChannel.open(asynchronousChannelGroup);
+ socketChannel =
AsynchronousSocketChannel.open(getAsynchronousChannelGroup());
} catch (IOException ioe) {
throw new DeploymentException(MESSAGES.connectionFailed(), ioe);
}
@@ -355,7 +320,7 @@
WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient,
this, null, null, null, null, null, subProtocol,
- Collections.<String, String> emptyMap(), false,
+ Collections.<String, String> emptyMap(), secure,
clientEndpointConfiguration);
endpoint.onOpen(wsSession, clientEndpointConfiguration);
registerSession(endpoint, wsSession);
@@ -797,9 +762,36 @@
WebsocketsLogger.ROOT_LOGGER.sessionCloseFailed(session.getId(), ioe);
}
}
+
+ // Only unregister with AsyncChannelGroupUtil if this instance
+ // registered with it
+ if (asynchronousChannelGroup != null) {
+ synchronized (asynchronousChannelGroupLock) {
+ if (asynchronousChannelGroup != null) {
+ AsyncChannelGroupUtil.unregister();
+ asynchronousChannelGroup = null;
+ }
+ }
+ }
}
+ private AsynchronousChannelGroup getAsynchronousChannelGroup() {
+ // Use AsyncChannelGroupUtil to share a common group amongst all
+ // WebSocket clients
+ AsynchronousChannelGroup result = asynchronousChannelGroup;
+ if (result == null) {
+ synchronized (asynchronousChannelGroupLock) {
+ if (asynchronousChannelGroup == null) {
+ asynchronousChannelGroup = AsyncChannelGroupUtil.register();
+ }
+ result = asynchronousChannelGroup;
+ }
+ }
+ return result;
+ }
+
+
// ----------------------------------------------- BackgroundProcess methods
@Override
@@ -836,21 +828,4 @@
}
- /**
- * Create threads for AsyncIO that have the right context class loader to
- * prevent memory leaks.
- */
- private static class AsyncIOThreadFactory implements ThreadFactory {
-
- private AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("WebSocketClient-AsyncIO-" + count.incrementAndGet());
- t.setContextClassLoader(this.getClass().getClassLoader());
- t.setDaemon(true);
- return t;
- }
- }
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoEndpointBase.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoEndpointBase.java 2014-05-22
09:02:55 UTC (rev 2413)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoEndpointBase.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -47,6 +47,14 @@
Object pojo = getPojo();
Map<String,String> pathParameters = getPathParameters();
+ // Add message handlers before calling onOpen since that may trigger a
+ // message which in turn could trigger a response and/or close the
+ // session
+ for (MessageHandler mh : methodMapping.getMessageHandlers(pojo,
+ pathParameters, session, config)) {
+ session.addMessageHandler(mh);
+ }
+
if (methodMapping.getOnOpen() != null) {
try {
methodMapping.getOnOpen().invoke(pojo,
@@ -67,11 +75,6 @@
return;
}
}
-
- for (MessageHandler mh : methodMapping.getMessageHandlers(pojo,
- pathParameters, session, config)) {
- session.addMessageHandler(mh);
- }
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java 2014-05-22
09:02:55 UTC (rev 2413)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -45,7 +45,6 @@
ServletContext sc = sce.getServletContext();
Object obj =
sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
if (obj instanceof WsServerContainer) {
- ((WsServerContainer) obj).shutdownExecutor();
((WsServerContainer) obj).destroy();
}
}
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2014-05-22
09:02:55 UTC (rev 2413)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2014-05-23
11:29:12 UTC (rev 2414)
@@ -55,6 +55,7 @@
import org.apache.tomcat.websocket.WsWebSocketContainer;
import org.apache.tomcat.websocket.pojo.PojoEndpointServer;
import org.apache.tomcat.websocket.pojo.PojoMethodMapping;
+import org.jboss.web.WebsocketsLogger;
/**
* Provides a per class loader (i.e. per web application) instance of a
@@ -85,6 +86,7 @@
private final ConcurrentHashMap<String,Set<WsSession>>
authenticatedSessions =
new ConcurrentHashMap<String, Set<WsSession>>();
private final ExecutorService executorService;
+ private final ThreadGroup threadGroup;
private volatile boolean endpointsRegistered = false;
WsServerContainer(ServletContext servletContext) {
@@ -110,7 +112,7 @@
}
// Executor config
int executorCoreSize = 0;
- int executorMaxSize = 10;
+ int executorMaxSize = 200;
long executorKeepAliveTimeSeconds = 60;
value = servletContext.getInitParameter(
Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
@@ -146,7 +148,7 @@
} else {
threadGroupName.append(servletContext.getContextPath());
}
- ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+ threadGroup = new ThreadGroup(threadGroupName.toString());
WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
executorService = new ThreadPoolExecutor(executorCoreSize,
@@ -259,6 +261,20 @@
}
+ @Override
+ public void destroy() {
+ shutdownExecutor();
+ super.destroy();
+ try {
+ threadGroup.destroy();
+ } catch (IllegalThreadStateException itse) {
+ // If the executor hasn't fully shutdown it won't be possible to
+ // destroy this thread group as there will still be threads running
+ WebsocketsLogger.ROOT_LOGGER.threadGroupNotDestryed(threadGroup.getName());
+ }
+ }
+
+
boolean areEndpointsRegistered() {
return endpointsRegistered;
}
@@ -428,7 +444,7 @@
}
- void shutdownExecutor() {
+ private void shutdownExecutor() {
if (executorService == null) {
return;
}
Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsLogger.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsLogger.java 2014-05-22 09:02:55
UTC (rev 2413)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsLogger.java 2014-05-23 11:29:12
UTC (rev 2414)
@@ -98,4 +98,8 @@
@Message(id = 8813, value = "WebSocket support is not available when running on
Java 6")
void noWebsocketsSupport();
+ @LogMessage(level = WARN)
+ @Message(id = 8814, value = "Thread group %s not destroyed")
+ void threadGroupNotDestryed(String name);
+
}
Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2014-05-22 09:02:55
UTC (rev 2413)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2014-05-23 11:29:12
UTC (rev 2414)
@@ -170,7 +170,7 @@
@Message(id = 8544, value = "The WebSocket session has been closed and no method
(apart from close()) may be called on a closed session")
IllegalStateException sessionAlreadyClosed();
- @Message(id = 8545, value = "Unable to create dedicated AsynchronousChannelGroup
for WebSocket clients which is required to prevent memory leaks in complex class loader
environments like J2EE containers")
+ @Message(id = 8545, value = "Unable to create dedicated AsynchronousChannelGroup
for WebSocket clients which is required to prevent memory leaks in complex class loader
environments like JEE containers")
IllegalStateException asyncGroupFail();
@Message(id = 8546, value = "Cannot use POJO class [%s] as it is not annotated
with @ClientEndpoint")