[jbossweb-commits] JBossWeb SVN: r2299 - in branches/7.4.x/src/main/java/org: jboss/web and 1 other directory.

jbossweb-commits at lists.jboss.org jbossweb-commits at lists.jboss.org
Wed Oct 30 12:09:23 EDT 2013


Author: remy.maucherat at jboss.com
Date: 2013-10-30 12:09:23 -0400 (Wed, 30 Oct 2013)
New Revision: 2299

Modified:
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
   branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
   branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
- Port patch.
- Redo some threading, a number of operations need to be dispatched to other threads.

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/Constants.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -31,6 +31,14 @@
     public static final String ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM =
             "org.apache.tomcat.websocket.noAddAfterHandshake";
 
+    // Executor configuration
+    public static final String EXECUTOR_CORE_SIZE_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorCoreSize";
+    public static final String EXECUTOR_MAX_SIZE_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorMaxSize";
+    public static final String EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM =
+            "org.apache.tomcat.websocket.executorKeepAliveTimeSeconds";
+
     public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE =
             "javax.websocket.server.ServerContainer";
 

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsContextListener.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -45,6 +45,7 @@
         ServletContext sc = sce.getServletContext();
         Object obj = sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
         if (obj instanceof WsServerContainer) {
+            ((WsServerContainer) obj).shutdownExecutor();
             ((WsServerContainer) obj).destroy();
         }
     }

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -227,7 +227,9 @@
 
         @Override
         public void onWritePossible() {
-            wsRemoteEndpointServer.onWritePossible();
+            // Triggered by the poller so this isn't the same thread that
+            // triggered the write so no need for a dispatch
+            wsRemoteEndpointServer.onWritePossible(false);
         }
 
 

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -20,6 +20,9 @@
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
 
 import javax.websocket.SendHandler;
 import javax.websocket.SendResult;
@@ -35,8 +38,12 @@
  */
 public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
 
+    private static final Queue<OnResultRunnable> onResultRunnables =
+            new ConcurrentLinkedQueue<OnResultRunnable>();
+
     private final AbstractServletOutputStream sos;
     private final WsWriteTimeout wsWriteTimeout;
+    private final ExecutorService executorService;
     private volatile SendHandler handler = null;
     private volatile ByteBuffer[] buffers = null;
 
@@ -48,6 +55,7 @@
             WsServerContainer serverContainer) {
         this.sos = sos;
         this.wsWriteTimeout = serverContainer.getTimeout();
+        this.executorService = serverContainer.getExecutorService();
     }
 
 
@@ -61,7 +69,9 @@
     protected void doWrite(SendHandler handler, ByteBuffer... buffers) {
         this.handler = handler;
         this.buffers = buffers;
-        onWritePossible();
+        // This is definitely the same thread that triggered the write so a
+        // dispatch will be required.
+        onWritePossible(true);
     }
 
 
@@ -73,7 +83,7 @@
     }
 
 
-    public void onWritePossible() {
+    public void onWritePossible(boolean useDispatch) {
         boolean complete = true;
         try {
             // If this is false there will be a call back when it is true
@@ -91,7 +101,7 @@
                 if (complete) {
                     timeoutExpiry = -1;
                     wsWriteTimeout.unregister(this);
-                    clearHandler(null);
+                    clearHandler(null, useDispatch);
                     // Explicit flush for compatibility with buffered streams
                     sos.flush();
                     if (close) {
@@ -103,7 +113,7 @@
 
         } catch (IOException ioe) {
             wsWriteTimeout.unregister(this);
-            clearHandler(ioe);
+            clearHandler(ioe, useDispatch);
             close();
         }
         if (!complete) {
@@ -122,7 +132,11 @@
     @Override
     protected void doClose() {
         if (handler != null) {
-            clearHandler(new EOFException());
+            // close() can be triggered by a wide range of scenarios. It is far
+            // simpler just to always use a dispatch that it is to try and track
+            // whether or not this method was called by the same thread that
+            // triggered the write
+            clearHandler(new EOFException(), true);
         }
         try {
             sos.close();
@@ -138,15 +152,31 @@
     }
 
 
-    protected void onTimeout() {
+    /*
+     * Currently this is only called from the background thread so we could just
+     * call clearHandler() with useDispatch == false but the method parameter
+     * was added in case other callers started to use this method to make sure
+     * that those callers think through what the correct value of useDispatch is
+     * for them.
+     */
+    protected void onTimeout(boolean useDispatch) {
         if (handler != null) {
-            clearHandler(new SocketTimeoutException());
+            clearHandler(new SocketTimeoutException(), useDispatch);
         }
         close();
     }
 
 
-    private void clearHandler(Throwable t) {
+    /**
+     *
+     * @param t             The throwable associated with any error that
+     *                      occurred
+     * @param useDispatch   Should {@link SendHandler#onResult(SendResult)} be
+     *                      called from a new thread, keeping in mind the
+     *                      requirements of
+     *                      {@link javax.websocket.RemoteEndpoint.Async}
+     */
+    private void clearHandler(Throwable t, boolean useDispatch) {
         // Setting the result marks this (partial) message as
         // complete which means the next one may be sent which
         // could update the value of the handler. Therefore, keep a
@@ -155,11 +185,64 @@
         SendHandler sh = handler;
         handler = null;
         if (sh != null) {
+            if (useDispatch) {
+                OnResultRunnable r = onResultRunnables.poll();
+                if (r == null) {
+                    r = new OnResultRunnable(onResultRunnables);
+                }
+                r.init(sh, t);
+                if (executorService == null || executorService.isShutdown()) {
+                    // Can't use the executor so call the runnable directly.
+                    // This may not be strictly specification compliant in all
+                    // cases but during shutdown only close messages are going
+                    // to be sent so there should not be the issue of nested
+                    // calls leading to stack overflow as described in bug
+                    // 55715. The issues with nested calls was the reason for
+                    // the separate thread requirement in the specification.
+                    r.run();
+                } else {
+                    executorService.execute(r);
+                }
+            } else {
+                if (t == null) {
+                    sh.onResult(new SendResult());
+                } else {
+                    sh.onResult(new SendResult(t));
+                }
+            }
+        }
+    }
+
+
+    private static class OnResultRunnable implements Runnable {
+
+        private final Queue<OnResultRunnable> queue;
+
+        private volatile SendHandler sh;
+        private volatile Throwable t;
+
+        private OnResultRunnable(Queue<OnResultRunnable> queue) {
+            this.queue = queue;
+        }
+
+        private void init(SendHandler sh, Throwable t) {
+            this.sh = sh;
+            this.t = t;
+        }
+
+        @Override
+        public void run() {
             if (t == null) {
                 sh.onResult(new SendResult());
             } else {
                 sh.onResult(new SendResult(t));
             }
+            t = null;
+            sh = null;
+            // Return the Runnable to the queue when it has been finished with
+            // Note if this method takes an age to finish there shouldn't be any
+            // thread safety issues as the fields are cleared above.
+            queue.add(this);
         }
     }
 }

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -28,6 +28,12 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.servlet.DispatcherType;
 import javax.servlet.FilterRegistration;
@@ -64,9 +70,7 @@
         implements ServerContainer {
 
     private static final CloseReason AUTHENTICATED_HTTP_SESSION_CLOSED =
-            new CloseReason(CloseCodes.VIOLATED_POLICY,
-                    "This connection was established under an authenticated " +
-                    "HTTP session that has ended.");
+            new CloseReason(CloseCodes.VIOLATED_POLICY, MESSAGES.expiredHttpSession());
 
     private final WsWriteTimeout wsWriteTimeout = new WsWriteTimeout();
 
@@ -80,6 +84,7 @@
     private volatile boolean addAllowed = true;
     private final ConcurrentHashMap<String,Set<WsSession>> authenticatedSessions =
             new ConcurrentHashMap<String, Set<WsSession>>();
+    private final ExecutorService executorService;
 
     WsServerContainer(ServletContext servletContext) {
 
@@ -103,6 +108,25 @@
         if (value != null) {
             setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
         }
+        // Executor config
+        int executorCoreSize = 0;
+        int executorMaxSize = 10;
+        long executorKeepAliveTimeSeconds = 60;
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
+        if (value != null) {
+            executorCoreSize = Integer.parseInt(value);
+        }
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
+        if (value != null) {
+            executorMaxSize = Integer.parseInt(value);
+        }
+        value = servletContext.getInitParameter(
+                Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
+        if (value != null) {
+            executorKeepAliveTimeSeconds = Long.parseLong(value);
+        }
 
         FilterRegistration.Dynamic fr = servletContext.addFilter(
                 WsFilter.class.getName(), new WsFilter());
@@ -112,6 +136,22 @@
                 DispatcherType.FORWARD);
 
         fr.addMappingForUrlPatterns(types, true, "/*");
+
+        // Use a per web application executor for any threads the the WebSocket
+        // server code needs to create. Group all of the threads under a single
+        // ThreadGroup.
+        StringBuffer threadGroupName = new StringBuffer("WebSocketServer-");
+        if ("".equals(servletContext.getContextPath())) {
+            threadGroupName.append("ROOT");
+        } else {
+            threadGroupName.append(servletContext.getContextPath());
+        }
+        ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+        WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
+
+        executorService = new ThreadPoolExecutor(executorCoreSize,
+                executorMaxSize, executorKeepAliveTimeSeconds, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
     }
 
 
@@ -372,6 +412,21 @@
         }
     }
 
+
+    ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+
+    void shutdownExecutor() {
+        executorService.shutdown();
+        try {
+            executorService.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            // Ignore the interruption and carry on
+        }
+    }
+
     private static void validateEncoders(Class<? extends Encoder>[] encoders)
             throws DeploymentException {
 
@@ -436,4 +491,21 @@
                     tpm2.getUriTemplate().getNormalizedPath());
         }
     }
+
+    private static class WsThreadFactory implements ThreadFactory {
+
+        private final ThreadGroup tg;
+        private final AtomicLong count = new AtomicLong(0);
+
+        private WsThreadFactory(ThreadGroup tg) {
+            this.tg = tg;
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(tg, r);
+            t.setName(tg.getName() + "-" + count.incrementAndGet());
+            return t;
+        }
+    }
 }

Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsWriteTimeout.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -52,7 +52,9 @@
             while (iter.hasNext()) {
                 WsRemoteEndpointImplServer endpoint = iter.next();
                 if (endpoint.getTimeoutExpiry() < now) {
-                    endpoint.onTimeout();
+                    // Background thread, not the thread that triggered the
+                    // write so no need to use a dispatch
+                    endpoint.onTimeout(false);
                 } else {
                     // Endpoints are ordered by timeout expiry so if this point
                     // is reached there is no need to check the remaining

Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java	2013-10-29 18:33:11 UTC (rev 2298)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java	2013-10-30 16:09:23 UTC (rev 2299)
@@ -296,4 +296,7 @@
     @Message(id = 8586, value = "Upgrade failed")
     String upgradeFailed();
 
+    @Message(id = 8587, value = "This connection was established under an authenticated HTTP session that has ended")
+    String expiredHttpSession();
+
 }



More information about the jbossweb-commits mailing list