Author: remy.maucherat(a)jboss.com
Date: 2013-12-17 09:08:02 -0500 (Tue, 17 Dec 2013)
New Revision: 2330
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBinary.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeText.java
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
Port websockets patches.
Modified: branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java 2013-12-16
17:06:36 UTC (rev 2329)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java 2013-12-17
14:08:02 UTC (rev 2330)
@@ -265,9 +265,17 @@
if (Util.isControl(opCode)) {
result = processDataControl();
} else if (textMessage) {
- result = processDataText();
+ if (textMsgHandler == null) {
+ result = swallowInput();
+ } else {
+ result = processDataText();
+ }
} else {
- result = processDataBinary();
+ if (binaryMsgHandler == null) {
+ result = swallowInput();
+ } else {
+ result = processDataBinary();
+ }
}
checkRoomPayload();
return result;
@@ -342,33 +350,31 @@
@SuppressWarnings("unchecked")
private void sendMessageText(boolean last) throws WsIOException {
- if (textMsgHandler != null) {
- if (textMsgHandler instanceof WrappedMessageHandler) {
- long maxMessageSize =
- ((WrappedMessageHandler) textMsgHandler).getMaxMessageSize();
- if (maxMessageSize > -1 &&
- messageBufferText.remaining() > maxMessageSize) {
- throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,
-
MESSAGES.messageTooLarge(Long.valueOf(messageBufferText.remaining()),
- Long.valueOf(maxMessageSize))));
- }
+ if (textMsgHandler instanceof WrappedMessageHandler) {
+ long maxMessageSize =
+ ((WrappedMessageHandler) textMsgHandler).getMaxMessageSize();
+ if (maxMessageSize > -1 &&
+ messageBufferText.remaining() > maxMessageSize) {
+ throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,
+
MESSAGES.messageTooLarge(Long.valueOf(messageBufferText.remaining()),
+ Long.valueOf(maxMessageSize))));
}
+ }
- try {
- if (textMsgHandler instanceof MessageHandler.Partial<?>) {
- ((MessageHandler.Partial<String>) textMsgHandler).onMessage(
- messageBufferText.toString(), last);
- } else {
- // Caller ensures last == true if this branch is used
- ((MessageHandler.Whole<String>) textMsgHandler).onMessage(
- messageBufferText.toString());
- }
- } catch (Throwable t) {
- ExceptionUtils.handleThrowable(t);
- wsSession.getLocal().onError(wsSession, t);
- } finally {
- messageBufferText.clear();
+ try {
+ if (textMsgHandler instanceof MessageHandler.Partial<?>) {
+ ((MessageHandler.Partial<String>) textMsgHandler).onMessage(
+ messageBufferText.toString(), last);
+ } else {
+ // Caller ensures last == true if this branch is used
+ ((MessageHandler.Whole<String>) textMsgHandler).onMessage(
+ messageBufferText.toString());
}
+ } catch (Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ wsSession.getLocal().onError(wsSession, t);
+ } finally {
+ messageBufferText.clear();
}
}
@@ -522,28 +528,26 @@
@SuppressWarnings("unchecked")
private void sendMessageBinary(ByteBuffer msg, boolean last)
throws WsIOException {
- if (binaryMsgHandler != null) {
- if (binaryMsgHandler instanceof WrappedMessageHandler) {
- long maxMessageSize =
- ((WrappedMessageHandler) binaryMsgHandler).getMaxMessageSize();
- if (maxMessageSize > -1 && msg.remaining() >
maxMessageSize) {
- throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,
- MESSAGES.messageTooLarge(
- Long.valueOf(msg.remaining()),
- Long.valueOf(maxMessageSize))));
- }
+ if (binaryMsgHandler instanceof WrappedMessageHandler) {
+ long maxMessageSize =
+ ((WrappedMessageHandler) binaryMsgHandler).getMaxMessageSize();
+ if (maxMessageSize > -1 && msg.remaining() > maxMessageSize) {
+ throw new WsIOException(new CloseReason(CloseCodes.TOO_BIG,
+ MESSAGES.messageTooLarge(
+ Long.valueOf(msg.remaining()),
+ Long.valueOf(maxMessageSize))));
}
- try {
- if (binaryMsgHandler instanceof MessageHandler.Partial<?>) {
- ((MessageHandler.Partial<ByteBuffer>)
binaryMsgHandler).onMessage(msg, last);
- } else {
- // Caller ensures last == true if this branch is used
- ((MessageHandler.Whole<ByteBuffer>)
binaryMsgHandler).onMessage(msg);
- }
- } catch(Throwable t) {
- ExceptionUtils.handleThrowable(t);
- wsSession.getLocal().onError(wsSession, t);
+ }
+ try {
+ if (binaryMsgHandler instanceof MessageHandler.Partial<?>) {
+ ((MessageHandler.Partial<ByteBuffer>)
binaryMsgHandler).onMessage(msg, last);
+ } else {
+ // Caller ensures last == true if this branch is used
+ ((MessageHandler.Whole<ByteBuffer>)
binaryMsgHandler).onMessage(msg);
}
+ } catch(Throwable t) {
+ ExceptionUtils.handleThrowable(t);
+ wsSession.getLocal().onError(wsSession, t);
}
}
@@ -603,16 +607,10 @@
if (Util.isControl(opCode)) {
return false;
} else if (textMessage) {
- if (textMsgHandler != null) {
- return textMsgHandler instanceof MessageHandler.Partial<?>;
- }
- return false;
+ return textMsgHandler instanceof MessageHandler.Partial<?>;
} else {
// Must be binary
- if (binaryMsgHandler != null) {
- return binaryMsgHandler instanceof MessageHandler.Partial<?>;
- }
- return false;
+ return binaryMsgHandler instanceof MessageHandler.Partial<?>;
}
}
@@ -645,6 +643,23 @@
}
+ private boolean swallowInput() {
+ long toSkip = Math.min(payloadLength - payloadWritten, writePos - readPos);
+ readPos += toSkip;
+ payloadWritten += toSkip;
+ if (payloadWritten == payloadLength) {
+ if (continuationExpected) {
+ newFrame();
+ } else {
+ newMessage();
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+
protected static long byteArrayToLong(byte[] b, int start, int len)
throws IOException {
if (len > 8) {
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBinary.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBinary.java 2013-12-16
17:06:36 UTC (rev 2329)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBinary.java 2013-12-17
14:08:02 UTC (rev 2330)
@@ -49,6 +49,15 @@
boolean isForInputStream, long maxMessageSize) {
super(pojo, method, session, params, indexPayload, convert,
indexSession, maxMessageSize);
+
+ // Update binary text size handled by session
+ if (maxMessageSize > -1 && maxMessageSize >
session.getMaxBinaryMessageBufferSize()) {
+ if (maxMessageSize > Integer.MAX_VALUE) {
+ throw MESSAGES.messageTooLarge();
+ }
+ session.setMaxBinaryMessageBufferSize((int) maxMessageSize);
+ }
+
try {
if (decoderClazzes != null) {
for (Class<? extends Decoder> decoderClazz : decoderClazzes) {
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeText.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeText.java 2013-12-16
17:06:36 UTC (rev 2329)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeText.java 2013-12-17
14:08:02 UTC (rev 2330)
@@ -51,6 +51,14 @@
super(pojo, method, session, params, indexPayload, convert,
indexSession, maxMessageSize);
+ // Update max text size handled by session
+ if (maxMessageSize > -1 && maxMessageSize >
session.getMaxTextMessageBufferSize()) {
+ if (maxMessageSize > Integer.MAX_VALUE) {
+ throw MESSAGES.messageTooLarge();
+ }
+ session.setMaxTextMessageBufferSize((int) maxMessageSize);
+ }
+
// Check for primitives
Class<?> type = method.getParameterTypes()[indexPayload];
if (Util.isPrimitive(type)) {
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2013-12-16
17:06:36 UTC (rev 2329)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/server/WsServerContainer.java 2013-12-17
14:08:02 UTC (rev 2330)
@@ -84,74 +84,89 @@
private volatile boolean addAllowed = true;
private final ConcurrentHashMap<String,Set<WsSession>>
authenticatedSessions =
new ConcurrentHashMap<String, Set<WsSession>>();
- private final ExecutorService executorService;
+ private ExecutorService executorService;
+ private volatile boolean initialized = false;
WsServerContainer(ServletContext servletContext) {
-
this.servletContext = servletContext;
+ }
- // Configure servlet context wide defaults
- String value = servletContext.getInitParameter(
- Constants.BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
- if (value != null) {
- setDefaultMaxBinaryMessageBufferSize(Integer.parseInt(value));
- }
+ private void init() {
- value = servletContext.getInitParameter(
- Constants.TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
- if (value != null) {
- setDefaultMaxTextMessageBufferSize(Integer.parseInt(value));
+ // Double checked locking. This is safe since Java > 1.5 and initialized
+ // is volatile
+ if (initialized) {
+ return;
}
+ synchronized (this) {
+ if (initialized) {
+ return;
+ }
+ initialized = true;
- value = servletContext.getInitParameter(
- Constants.ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM);
- if (value != null) {
- setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
- }
- // Executor config
- int executorCoreSize = 0;
- int executorMaxSize = 10;
- long executorKeepAliveTimeSeconds = 60;
- value = servletContext.getInitParameter(
- Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
- if (value != null) {
- executorCoreSize = Integer.parseInt(value);
- }
- value = servletContext.getInitParameter(
- Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
- if (value != null) {
- executorMaxSize = Integer.parseInt(value);
- }
- value = servletContext.getInitParameter(
- Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
- if (value != null) {
- executorKeepAliveTimeSeconds = Long.parseLong(value);
- }
+ // Configure servlet context wide defaults
+ String value = servletContext.getInitParameter(
+ Constants.BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setDefaultMaxBinaryMessageBufferSize(Integer.parseInt(value));
+ }
- FilterRegistration.Dynamic fr = servletContext.addFilter(
- "Tomcat WebSocket (JSR356) Filter", new WsFilter());
- fr.setAsyncSupported(true);
+ value = servletContext.getInitParameter(
+ Constants.TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setDefaultMaxTextMessageBufferSize(Integer.parseInt(value));
+ }
- EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST,
- DispatcherType.FORWARD);
+ value = servletContext.getInitParameter(
+ Constants.ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
+ }
+ // Executor config
+ int executorCoreSize = 0;
+ int executorMaxSize = 10;
+ long executorKeepAliveTimeSeconds = 60;
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_CORE_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorCoreSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_MAX_SIZE_INIT_PARAM);
+ if (value != null) {
+ executorMaxSize = Integer.parseInt(value);
+ }
+ value = servletContext.getInitParameter(
+ Constants.EXECUTOR_KEEPALIVETIME_SECONDS_INIT_PARAM);
+ if (value != null) {
+ executorKeepAliveTimeSeconds = Long.parseLong(value);
+ }
- fr.addMappingForUrlPatterns(types, true, "/*");
+ FilterRegistration.Dynamic fr = servletContext.addFilter(
+ "Tomcat WebSocket (JSR356) Filter", new WsFilter());
+ fr.setAsyncSupported(true);
- // Use a per web application executor for any threads the the WebSocket
- // server code needs to create. Group all of the threads under a single
- // ThreadGroup.
- StringBuffer threadGroupName = new StringBuffer("WebSocketServer-");
- if ("".equals(servletContext.getContextPath())) {
- threadGroupName.append("ROOT");
- } else {
- threadGroupName.append(servletContext.getContextPath());
+ EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST,
+ DispatcherType.FORWARD);
+
+ fr.addMappingForUrlPatterns(types, true, "/*");
+
+ // Use a per web application executor for any threads the the WebSocket
+ // server code needs to create. Group all of the threads under a single
+ // ThreadGroup.
+ StringBuffer threadGroupName = new
StringBuffer("WebSocketServer-");
+ if ("".equals(servletContext.getContextPath())) {
+ threadGroupName.append("ROOT");
+ } else {
+ threadGroupName.append(servletContext.getContextPath());
+ }
+ ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
+ WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
+
+ executorService = new ThreadPoolExecutor(executorCoreSize,
+ executorMaxSize, executorKeepAliveTimeSeconds, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
}
- ThreadGroup threadGroup = new ThreadGroup(threadGroupName.toString());
- WsThreadFactory wsThreadFactory = new WsThreadFactory(threadGroup);
-
- executorService = new ThreadPoolExecutor(executorCoreSize,
- executorMaxSize, executorKeepAliveTimeSeconds, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), wsThreadFactory);
}
@@ -167,6 +182,8 @@
public void addEndpoint(ServerEndpointConfig sec)
throws DeploymentException {
+ init();
+
if (enforceNoAddAfterHandshake && !addAllowed) {
throw new DeploymentException(MESSAGES.addNotAllowed());
}
@@ -214,6 +231,8 @@
@Override
public void addEndpoint(Class<?> pojo) throws DeploymentException {
+ init();
+
ServerEndpoint annotation = pojo.getAnnotation(ServerEndpoint.class);
if (annotation == null) {
throw new DeploymentException(MESSAGES.cannotDeployPojo(pojo.getName()));
@@ -422,6 +441,9 @@
void shutdownExecutor() {
+ if (executorService == null) {
+ return;
+ }
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-12-16 17:06:36
UTC (rev 2329)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-12-17 14:08:02
UTC (rev 2330)
@@ -302,4 +302,7 @@
@Message(id = 8588, value = "The remainder of the message will not be sent
because the WebSocket session has been closed")
String messageRemainderSessionClosed();
+ @Message(id = 8589, value = "The maximum supported message size for this
implementation is Integer.MAX_VALUE")
+ IllegalArgumentException messageTooLarge();
+
}