Author: remy.maucherat(a)jboss.com
Date: 2014-09-29 11:13:58 -0400 (Mon, 29 Sep 2014)
New Revision: 2511
Added:
branches/7.5.x/src/main/java/org/apache/tomcat/util/security/ConcurrentMessageDigest.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/MessagePart.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/PerMessageDeflate.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Transformation.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationFactory.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationResult.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtension.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtensionParameter.java
Modified:
branches/7.5.x/src/main/java/javax/websocket/Session.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Constants.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Util.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameClient.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerPartialBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/DefaultServerEndpointConfigurator.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/UpgradeUtil.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsFrameServer.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
branches/7.5.x/src/main/java/org/jboss/web/CoyoteMessages.java
branches/7.5.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
Rebase on the Tomcat upstream websockets (1.1 + compression support).
Modified: branches/7.5.x/src/main/java/javax/websocket/Session.java
===================================================================
--- branches/7.5.x/src/main/java/javax/websocket/Session.java 2014-09-17 07:50:46 UTC (rev
2510)
+++ branches/7.5.x/src/main/java/javax/websocket/Session.java 2014-09-29 15:13:58 UTC (rev
2511)
@@ -31,8 +31,22 @@
*/
WebSocketContainer getContainer();
- void addMessageHandler(MessageHandler listener)
- throws IllegalStateException;
+ /**
+ * Registers a {@link MessageHandler} for incoming messages. Only one
+ * {@link MessageHandler} may be registered for each message type (text,
+ * binary, pong). The message type will be derived at runtime from the
+ * provided {@link MessageHandler} instance. It is not always possible to do
+ * this so it is better to use
+ * {@link #addMessageHandler(Class, javax.websocket.MessageHandler.Partial)}
+ * or
+ * {@link #addMessageHandler(Class, javax.websocket.MessageHandler.Whole)}.
+ *
+ * @param listener The message handler for a incoming message
+ *
+ * @throws IllegalStateException If a message handler has already been
+ * registered for the associated message type
+ */
+ void addMessageHandler(MessageHandler listener) throws IllegalStateException;
Set<MessageHandler> getMessageHandlers();
@@ -126,4 +140,34 @@
* this session is associated with.
*/
Set<Session> getOpenSessions();
+
+ /**
+ * Registers a {@link MessageHandler} for partial incoming messages. Only
+ * one {@link MessageHandler} may be registered for each message type (text
+ * or binary, pong messages are never presented as partial messages).
+ *
+ * @param clazz The type of message that the given handler is intended
+ * for
+ * @param listener The message handler for a incoming message
+ *
+ * @throws IllegalStateException If a message handler has already been
+ * registered for the associated message type
+ */
+ <T> void addMessageHandler(Class<T> clazz,
MessageHandler.Partial<T> handler)
+ throws IllegalStateException;
+
+ /**
+ * Registers a {@link MessageHandler} for whole incoming messages. Only
+ * one {@link MessageHandler} may be registered for each message type (text,
+ * binary, pong).
+ *
+ * @param clazz The type of message that the given handler is intended
+ * for
+ * @param listener The message handler for a incoming message
+ *
+ * @throws IllegalStateException If a message handler has already been
+ * registered for the associated message type
+ */
+ <T> void addMessageHandler(Class<T> clazz, MessageHandler.Whole<T>
handler)
+ throws IllegalStateException;
}
Added:
branches/7.5.x/src/main/java/org/apache/tomcat/util/security/ConcurrentMessageDigest.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/util/security/ConcurrentMessageDigest.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/util/security/ConcurrentMessageDigest.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.security;
+
+import static org.jboss.web.CoyoteMessages.MESSAGES;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A thread safe wrapper around {@link MessageDigest} that does not make use
+ * of ThreadLocal and - broadly - only creates enough MessageDigest objects
+ * to satisfy the concurrency requirements.
+ */
+public class ConcurrentMessageDigest {
+
+ private static final String MD5 = "MD5";
+ private static final String SHA1 = "SHA-1";
+
+ private static final Map<String,Queue<MessageDigest>> queues =
+ new HashMap<String,Queue<MessageDigest>>();
+
+
+ private ConcurrentMessageDigest() {
+ // Hide default constructor for this utility class
+ }
+
+ static {
+ try {
+ // Init commonly used algorithms
+ init(MD5);
+ init(SHA1);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public static byte[] digestMD5(byte[]... input) {
+ return digest(MD5, input);
+ }
+
+ public static byte[] digestSHA1(byte[]... input) {
+ return digest(SHA1, input);
+ }
+
+ public static byte[] digest(String algorithm, byte[]... input) {
+
+ Queue<MessageDigest> queue = queues.get(algorithm);
+ if (queue == null) {
+ throw MESSAGES.mustInitFirst();
+ }
+
+ MessageDigest md = queue.poll();
+ if (md == null) {
+ try {
+ md = MessageDigest.getInstance(algorithm);
+ } catch (NoSuchAlgorithmException e) {
+ // Ignore. Impossible if init() has been successfully called
+ // first.
+ throw MESSAGES.mustInitFirst();
+ }
+ }
+
+ for (byte[] bytes : input) {
+ md.update(bytes);
+ }
+ byte[] result = md.digest();
+
+ queue.add(md);
+
+ return result;
+ }
+
+
+ /**
+ * Ensures that {@link #digest(String, byte[][])} will support the specified
+ * algorithm. This method <b>must</b> be called and return successfully
+ * before using {@link #digest(String, byte[][])}.
+ *
+ * @param algorithm The message digest algorithm to be supported
+ *
+ * @throws NoSuchAlgorithmException If the algorithm is not supported by the
+ * JVM
+ */
+ public static void init(String algorithm) throws NoSuchAlgorithmException {
+ synchronized (queues) {
+ if (!queues.containsKey(algorithm)) {
+ MessageDigest md = MessageDigest.getInstance(algorithm);
+ Queue<MessageDigest> queue = new
ConcurrentLinkedQueue<MessageDigest>();
+ queue.add(md);
+ queues.put(algorithm, queue);
+ }
+ }
+ }
+}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/AsyncChannelWrapperSecure.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -552,7 +552,8 @@
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("WebSocketClient-SecureIO-" + count.incrementAndGet());
- t.setContextClassLoader(this.getClass().getClassLoader());
+ // No need to set the context class loader. The threads will be
+ // cleaned up when the connection is closed.
t.setDaemon(true);
return t;
}
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Constants.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Constants.java 2014-09-17
07:50:46 UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Constants.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -16,8 +16,13 @@
*/
package org.apache.tomcat.websocket;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Locale;
+import javax.websocket.Extension;
+
/**
* Internal implementation constants.
*/
@@ -64,6 +69,14 @@
Boolean.getBoolean(
"org.apache.tomcat.websocket.RELAXED_CLOSE_EVENT");
+ public static final List<Extension> INSTALLED_EXTENSIONS;
+
+ static {
+ List<Extension> installed = new ArrayList<Extension>(1);
+ installed.add(new WsExtension("permessage-deflate"));
+ INSTALLED_EXTENSIONS = Collections.unmodifiableList(installed);
+ }
+
private Constants() {
// Hide default constructor
}
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/MessagePart.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/MessagePart.java
(rev 0)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/MessagePart.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import java.nio.ByteBuffer;
+
+import javax.websocket.SendHandler;
+
+class MessagePart {
+ private final boolean fin;
+ private final int rsv;
+ private final byte opCode;
+ private final ByteBuffer payload;
+ private final SendHandler intermediateHandler;
+ private volatile SendHandler endHandler;
+
+ public MessagePart( boolean fin, int rsv, byte opCode, ByteBuffer payload,
+ SendHandler intermediateHandler, SendHandler endHandler) {
+ this.fin = fin;
+ this.rsv = rsv;
+ this.opCode = opCode;
+ this.payload = payload;
+ this.intermediateHandler = intermediateHandler;
+ this.endHandler = endHandler;
+ }
+
+
+ public boolean isFin() {
+ return fin;
+ }
+
+
+ public int getRsv() {
+ return rsv;
+ }
+
+
+ public byte getOpCode() {
+ return opCode;
+ }
+
+
+ public ByteBuffer getPayload() {
+ return payload;
+ }
+
+
+ public SendHandler getIntermediateHandler() {
+ return intermediateHandler;
+ }
+
+
+ public SendHandler getEndHandler() {
+ return endHandler;
+ }
+
+ public void setEndHandler(SendHandler endHandler) {
+ this.endHandler = endHandler;
+ }
+}
+
+
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/PerMessageDeflate.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/PerMessageDeflate.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/PerMessageDeflate.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import static org.jboss.web.WebsocketsMessages.MESSAGES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import javax.websocket.Extension;
+import javax.websocket.Extension.Parameter;
+import javax.websocket.SendHandler;
+
+public class PerMessageDeflate implements Transformation {
+
+ private static final String SERVER_NO_CONTEXT_TAKEOVER =
"server_no_context_takeover";
+ private static final String CLIENT_NO_CONTEXT_TAKEOVER =
"client_no_context_takeover";
+ private static final String SERVER_MAX_WINDOW_BITS =
"server_max_window_bits";
+ private static final String CLIENT_MAX_WINDOW_BITS =
"client_max_window_bits";
+
+ private static final int RSV_BITMASK = 0x4;
+ private static final byte[] EOM_BYTES = new byte[] {0, 0, -1, -1};
+
+ public static final String NAME = "permessage-deflate";
+
+ private final boolean serverContextTakeover;
+ private final int serverMaxWindowBits;
+ private final boolean clientContextTakeover;
+ private final int clientMaxWindowBits;
+ private final Inflater inflater = new Inflater(true);
+ private final ByteBuffer readBuffer =
ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+ private final Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ private final byte[] EOM_BUFFER = new byte[EOM_BYTES.length + 1];
+
+ private volatile Transformation next;
+ private volatile boolean skipDecompression = false;
+ private volatile ByteBuffer writeBuffer =
ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+ private volatile boolean firstCompressedFrameWritten = false;
+
+ static PerMessageDeflate negotiate(List<List<Parameter>> preferences) {
+ // Accept the first preference that the server is able to support
+ for (List<Parameter> preference : preferences) {
+ boolean ok = true;
+ boolean serverContextTakeover = true;
+ int serverMaxWindowBits = -1;
+ boolean clientContextTakeover = true;
+ int clientMaxWindowBits = -1;
+
+ for (Parameter param : preference) {
+ if (SERVER_NO_CONTEXT_TAKEOVER.equals(param.getName())) {
+ if (serverContextTakeover) {
+ serverContextTakeover = false;
+ } else {
+ // Duplicate definition
+ throw
MESSAGES.duplicateDeflateParameter(SERVER_NO_CONTEXT_TAKEOVER);
+ }
+ } else if (CLIENT_NO_CONTEXT_TAKEOVER.equals(param.getName())) {
+ if (clientContextTakeover) {
+ clientContextTakeover = false;
+ } else {
+ // Duplicate definition
+ throw
MESSAGES.duplicateDeflateParameter(CLIENT_NO_CONTEXT_TAKEOVER );
+ }
+ } else if (SERVER_MAX_WINDOW_BITS.equals(param.getName())) {
+ if (serverMaxWindowBits == -1) {
+ serverMaxWindowBits = Integer.parseInt(param.getValue());
+ if (serverMaxWindowBits < 8 || serverMaxWindowBits > 15) {
+ throw
MESSAGES.invalidDeflateWindowSize(SERVER_MAX_WINDOW_BITS,
+ Integer.valueOf(serverMaxWindowBits));
+ }
+ // Java SE API (as of Java 8) does not expose the API to
+ // control the Window size. It is effectively hard-coded
+ // to 15
+ if (serverMaxWindowBits != 15) {
+ ok = false;
+ break;
+ }
+ } else {
+ // Duplicate definition
+ throw MESSAGES.duplicateDeflateParameter(SERVER_MAX_WINDOW_BITS
);
+ }
+ } else if (CLIENT_MAX_WINDOW_BITS.equals(param.getName())) {
+ if (clientMaxWindowBits == -1) {
+ if (param.getValue() == null) {
+ // Hint to server that the client supports this
+ // option. Java SE API (as of Java 8) does not
+ // expose the API to control the Window size. It is
+ // effectively hard-coded to 15
+ clientMaxWindowBits = 15;
+ } else {
+ clientMaxWindowBits = Integer.parseInt(param.getValue());
+ if (clientMaxWindowBits < 8 || clientMaxWindowBits >
15) {
+ throw
MESSAGES.invalidDeflateWindowSize(CLIENT_MAX_WINDOW_BITS,
+ Integer.valueOf(clientMaxWindowBits));
+ }
+ }
+ // Not a problem is client specified a window size less
+ // than 15 since the server will always use a larger
+ // window it will still work.
+ } else {
+ // Duplicate definition
+ throw
MESSAGES.duplicateDeflateParameter(CLIENT_MAX_WINDOW_BITS);
+ }
+ } else {
+ // Unknown parameter
+ throw MESSAGES.unkownDeflateParameter(param.getName());
+ }
+ }
+ if (ok) {
+ return new PerMessageDeflate(serverContextTakeover, serverMaxWindowBits,
+ clientContextTakeover, clientMaxWindowBits);
+ }
+ }
+ // Failed to negotiate agreeable terms
+ return null;
+ }
+
+
+ private PerMessageDeflate(boolean serverContextTakeover, int serverMaxWindowBits,
+ boolean clientContextTakeover, int clientMaxWindowBits) {
+ this.serverContextTakeover = serverContextTakeover;
+ this.serverMaxWindowBits = serverMaxWindowBits;
+ this.clientContextTakeover = clientContextTakeover;
+ this.clientMaxWindowBits = clientMaxWindowBits;
+ }
+
+
+ @Override
+ public TransformationResult getMoreData(byte opCode, boolean fin, int rsv, ByteBuffer
dest)
+ throws IOException {
+ // Control frames are never compressed and may appear in the middle of
+ // a WebSocket method. Pass them straight through.
+ if (Util.isControl(opCode)) {
+ return next.getMoreData(opCode, fin, rsv, dest);
+ }
+
+ if (!Util.isContinuation(opCode)) {
+ // First frame in new message
+ skipDecompression = (rsv & RSV_BITMASK) == 0;
+ }
+
+ // Pass uncompressed frames straight through.
+ if (skipDecompression) {
+ return next.getMoreData(opCode, fin, rsv, dest);
+ }
+
+ int written;
+ boolean usedEomBytes = false;
+
+ while (dest.remaining() > 0) {
+ // Space available in destination. Try and fill it.
+ try {
+ written = inflater.inflate(
+ dest.array(), dest.arrayOffset() + dest.position(),
dest.remaining());
+ } catch (DataFormatException e) {
+ throw new IOException(MESSAGES.deflateFailure(), e);
+ }
+ dest.position(dest.position() + written);
+
+ if (inflater.needsInput() && !usedEomBytes ) {
+ if (dest.hasRemaining()) {
+ readBuffer.clear();
+ TransformationResult nextResult =
+ next.getMoreData(opCode, fin, (rsv ^ RSV_BITMASK),
readBuffer);
+ inflater.setInput(
+ readBuffer.array(), readBuffer.arrayOffset(),
readBuffer.position());
+ if (TransformationResult.UNDERFLOW.equals(nextResult)) {
+ return nextResult;
+ } else if (TransformationResult.END_OF_FRAME.equals(nextResult)
&&
+ readBuffer.position() == 0) {
+ if (fin) {
+ inflater.setInput(EOM_BYTES);
+ usedEomBytes = true;
+ } else {
+ return TransformationResult.END_OF_FRAME;
+ }
+ }
+ }
+ } else if (written == 0) {
+ if (fin && !serverContextTakeover) {
+ inflater.reset();
+ }
+ return TransformationResult.END_OF_FRAME;
+ }
+ }
+
+ return TransformationResult.OVERFLOW;
+ }
+
+
+ @Override
+ public boolean validateRsv(int rsv, byte opCode) {
+ if (Util.isControl(opCode)) {
+ if ((rsv & RSV_BITMASK) > 0) {
+ return false;
+ } else {
+ if (next == null) {
+ return true;
+ } else {
+ return next.validateRsv(rsv, opCode);
+ }
+ }
+ } else {
+ int rsvNext = rsv;
+ if ((rsv & RSV_BITMASK) > 0) {
+ rsvNext = rsv ^ RSV_BITMASK;
+ }
+ if (next == null) {
+ return true;
+ } else {
+ return next.validateRsv(rsvNext, opCode);
+ }
+ }
+ }
+
+
+ @Override
+ public Extension getExtensionResponse() {
+ Extension result = new WsExtension(NAME);
+
+ List<Extension.Parameter> params = result.getParameters();
+
+ if (!serverContextTakeover) {
+ params.add(new WsExtensionParameter(SERVER_NO_CONTEXT_TAKEOVER, null));
+ }
+ if (serverMaxWindowBits != -1) {
+ params.add(new WsExtensionParameter(SERVER_MAX_WINDOW_BITS,
+ Integer.toString(serverMaxWindowBits)));
+ }
+ if (!clientContextTakeover) {
+ params.add(new WsExtensionParameter(CLIENT_NO_CONTEXT_TAKEOVER, null));
+ }
+ if (clientMaxWindowBits != -1) {
+ params.add(new WsExtensionParameter(CLIENT_MAX_WINDOW_BITS,
+ Integer.toString(clientMaxWindowBits)));
+ }
+
+ return result;
+ }
+
+
+ @Override
+ public void setNext(Transformation t) {
+ if (next == null) {
+ this.next = t;
+ } else {
+ next.setNext(t);
+ }
+ }
+
+
+ @Override
+ public boolean validateRsvBits(int i) {
+ if ((i & RSV_BITMASK) > 0) {
+ return false;
+ }
+ if (next == null) {
+ return true;
+ } else {
+ return next.validateRsvBits(i | RSV_BITMASK);
+ }
+ }
+
+
+ @Override
+ public List<MessagePart> sendMessagePart(List<MessagePart>
uncompressedParts) {
+ List<MessagePart> allCompressedParts = new ArrayList<MessagePart>();
+
+ for (MessagePart uncompressedPart : uncompressedParts) {
+ byte opCode = uncompressedPart.getOpCode();
+ if (Util.isControl(opCode)) {
+ // Control messages can appear in the middle of other messages
+ // and must not be compressed. Pass it straight through
+ allCompressedParts.add(uncompressedPart);
+ } else {
+ List<MessagePart> compressedParts = new
ArrayList<MessagePart>();
+ ByteBuffer uncompressedPayload = uncompressedPart.getPayload();
+ SendHandler uncompressedIntermediateHandler =
+ uncompressedPart.getIntermediateHandler();
+
+ deflater.setInput(uncompressedPayload.array(),
+ uncompressedPayload.arrayOffset() +
uncompressedPayload.position(),
+ uncompressedPayload.remaining());
+
+ int flush = (uncompressedPart.isFin() ? Deflater.SYNC_FLUSH :
Deflater.NO_FLUSH);
+ boolean deflateRequired = true;
+
+ while(deflateRequired) {
+ ByteBuffer compressedPayload = writeBuffer;
+
+ int written = deflater.deflate(compressedPayload.array(),
+ compressedPayload.arrayOffset() +
compressedPayload.position(),
+ compressedPayload.remaining(), flush);
+ compressedPayload.position(compressedPayload.position() + written);
+
+ if (!uncompressedPart.isFin() &&
compressedPayload.hasRemaining() && deflater.needsInput()) {
+ // This message part has been fully processed by the
+ // deflater. Fire the send handler for this message part
+ // and move on to the next message part.
+ break;
+ }
+
+ // If this point is reached, a new compressed message part
+ // will be created...
+ MessagePart compressedPart;
+
+ // .. and a new writeBuffer will be required.
+ writeBuffer = ByteBuffer.allocate(Constants.DEFAULT_BUFFER_SIZE);
+
+ // Flip the compressed payload ready for writing
+ compressedPayload.flip();
+
+ boolean fin = uncompressedPart.isFin();
+ boolean full = compressedPayload.limit() ==
compressedPayload.capacity();
+ boolean needsInput = deflater.needsInput();
+
+ if (fin && !full && needsInput) {
+ // End of compressed message. Drop EOM bytes and output.
+ compressedPayload.limit(compressedPayload.limit() -
EOM_BYTES.length);
+ compressedPart = new MessagePart(true, getRsv(uncompressedPart),
+ opCode, compressedPayload,
uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ deflateRequired = false;
+ startNewMessage();
+ } else if (full && !needsInput) {
+ // Write buffer full and input message not fully read.
+ // Output and start new compressed part.
+ compressedPart = new MessagePart(false,
getRsv(uncompressedPart),
+ opCode, compressedPayload,
uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ } else if (!fin && full && needsInput) {
+ // Write buffer full and input message not fully read.
+ // Output and get more data.
+ compressedPart = new MessagePart(false,
getRsv(uncompressedPart),
+ opCode, compressedPayload,
uncompressedIntermediateHandler,
+ uncompressedIntermediateHandler);
+ deflateRequired = false;
+ } else if (fin && full && needsInput) {
+ // Write buffer full. Input fully read. Deflater may be
+ // in one of four states:
+ // - output complete (just happened to align with end of
+ // buffer
+ // - in middle of EOM bytes
+ // - about to write EOM bytes
+ // - more data to write
+ int eomBufferWritten = deflater.deflate(EOM_BUFFER, 0,
EOM_BUFFER.length, Deflater.SYNC_FLUSH);
+ if (eomBufferWritten < EOM_BUFFER.length) {
+ // EOM has just been completed
+ compressedPayload.limit(compressedPayload.limit() -
EOM_BYTES.length + eomBufferWritten);
+ compressedPart = new MessagePart(true,
+ getRsv(uncompressedPart), opCode, compressedPayload,
+ uncompressedIntermediateHandler,
uncompressedIntermediateHandler);
+ deflateRequired = false;
+ startNewMessage();
+ } else {
+ // More data to write
+ // Copy bytes to new write buffer
+ writeBuffer.put(EOM_BUFFER, 0, eomBufferWritten);
+ compressedPart = new MessagePart(false,
+ getRsv(uncompressedPart), opCode, compressedPayload,
+ uncompressedIntermediateHandler,
uncompressedIntermediateHandler);
+ }
+ } else {
+ throw new IllegalStateException("Should never
happen");
+ }
+
+ // Add the newly created compressed part to the set of parts
+ // to pass on to the next transformation.
+ compressedParts.add(compressedPart);
+ }
+
+ SendHandler uncompressedEndHandler = uncompressedPart.getEndHandler();
+ int size = compressedParts.size();
+ if (size > 0) {
+ compressedParts.get(size - 1).setEndHandler(uncompressedEndHandler);
+ }
+
+ allCompressedParts.addAll(compressedParts);
+ }
+ }
+
+ if (next == null) {
+ return allCompressedParts;
+ } else {
+ return next.sendMessagePart(allCompressedParts);
+ }
+ }
+
+
+ private void startNewMessage() {
+ firstCompressedFrameWritten = false;
+ if (!clientContextTakeover) {
+ deflater.reset();
+ }
+ }
+
+ private int getRsv(MessagePart uncompressedMessagePart) {
+ int result = uncompressedMessagePart.getRsv();
+ if (!firstCompressedFrameWritten) {
+ result += RSV_BITMASK;
+ firstCompressedFrameWritten = true;
+ }
+ return result;
+ }
+}
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Transformation.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Transformation.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Transformation.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import javax.websocket.Extension;
+
+/**
+ * The internal representation of the transformation that a WebSocket extension
+ * performs on a message.
+ */
+public interface Transformation {
+
+ /**
+ * Sets the next transformation in the pipeline.
+ */
+ void setNext(Transformation t);
+
+ /**
+ * Validate that the RSV bit(s) required by this transformation are not
+ * being used by another extension. The implementation is expected to set
+ * any bits it requires before passing the set of in-use bits to the next
+ * transformation.
+ *
+ * @param i The RSV bits marked as in use so far as an int in the
+ * range zero to seven with RSV1 as the MSB and RSV3 as the
+ * LSB
+ *
+ * @return <code>true</code> if the combination of RSV bits used by the
+ * transformations in the pipeline do not conflict otherwise
+ * <code>false</code>
+ */
+ boolean validateRsvBits(int i);
+
+ /**
+ * Obtain the extension that describes the information to be returned to the
+ * client.
+ */
+ Extension getExtensionResponse();
+
+ /**
+ * Obtain more input data.
+ *
+ * @param opCode The opcode for the frame currently being processed
+ * @param fin Is this the final frame in this WebSocket message?
+ * @param rsv The reserved bits for the frame currently being
+ * processed
+ * @param dest The buffer in which the data is to be written
+ */
+ TransformationResult getMoreData(byte opCode, boolean fin, int rsv, ByteBuffer dest)
throws IOException;
+
+ /**
+ * Validates the RSV and opcode combination (assumed to have been extracted
+ * from a WebSocket Frame) for this extension. The implementation is
+ * expected to unset any RSV bits it has validated before passing the
+ * remaining RSV bits to the next transformation in the pipeline.
+ *
+ * @param rsv The RSV bits received as an int in the range zero to
+ * seven with RSV1 as the MSB and RSV3 as the LSB
+ * @param opCode The opCode received
+ *
+ * @return <code>true</code> if the RSV is valid otherwise
+ * <code>false</code>
+ */
+ boolean validateRsv(int rsv, byte opCode);
+
+ /**
+ * Takes the provided list of messages, transforms them, passes the
+ * transformed list on to the next transformation (if any) and then returns
+ * the resulting list of message parts after all of the transformations have
+ * been applied.
+ *
+ * @param messageParts The list of messages to be transformed
+ *
+ * @return The list of messages after this any any subsequent
+ * transformations have been applied. The size of the returned list
+ * may be bigger or smaller than the size of the input list
+ */
+ List<MessagePart> sendMessagePart(List<MessagePart> messageParts);
+}
Added:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationFactory.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationFactory.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationFactory.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import static org.jboss.web.WebsocketsMessages.MESSAGES;
+
+import java.util.List;
+
+import javax.websocket.Extension;
+
+public class TransformationFactory {
+
+ private static final TransformationFactory factory = new TransformationFactory();
+
+ private TransformationFactory() {
+ // Hide default constructor
+ }
+
+ public static TransformationFactory getInstance() {
+ return factory;
+ }
+
+ public Transformation create(String name, List<List<Extension.Parameter>>
preferences) {
+ if (PerMessageDeflate.NAME.equals(name)) {
+ return PerMessageDeflate.negotiate(preferences);
+ }
+ throw MESSAGES.unsupportedExtension(name);
+ }
+}
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationResult.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationResult.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/TransformationResult.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+public enum TransformationResult {
+ /**
+ * The end of the available data was reached before the WebSocket frame was
+ * completely read.
+ */
+ UNDERFLOW,
+
+ /**
+ * The provided destination buffer was filled before all of the available
+ * data from the WebSocket frame could be processed.
+ */
+ OVERFLOW,
+
+ /**
+ * The end of the WebSocket frame was reached and all the data from that
+ * frame processed into the provided destination buffer.
+ */
+ END_OF_FRAME
+}
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Util.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Util.java 2014-09-17 07:50:46
UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/Util.java 2014-09-29 15:13:58
UTC (rev 2511)
@@ -45,6 +45,7 @@
import javax.websocket.DeploymentException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
+import javax.websocket.Extension;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;
@@ -76,6 +77,11 @@
}
+ static boolean isContinuation(byte opCode) {
+ return opCode == Constants.OPCODE_CONTINUATION;
+ }
+
+
static CloseCode getCloseCode(int code) {
if (code > 2999 && code < 5000) {
return CloseCodes.NORMAL_CLOSURE;
@@ -165,7 +171,7 @@
}
- public static Class<?> getDecoderType(Class<? extends Decoder> decoder)
{
+ private static Class<?> getDecoderType(Class<? extends Decoder> decoder)
{
return Util.getGenericType(Decoder.class, decoder).getClazz();
}
@@ -346,13 +352,10 @@
}
-
- public static Set<MessageHandlerResult> getMessageHandlers(
+ static Set<MessageHandlerResult> getMessageHandlers(Class<?> target,
MessageHandler listener, EndpointConfig endpointConfig,
Session session) {
- Class<?> target = Util.getMessageType(listener);
-
// Will never be more than 2 types
Set<MessageHandlerResult> results = new
HashSet<MessageHandlerResult>(2);
@@ -441,6 +444,84 @@
}
+ public static void parseExtensionHeader(List<Extension> extensions,
+ String header) {
+ // The relevant ABNF for the Sec-WebSocket-Extensions is as follows:
+ // extension-list = 1#extension
+ // extension = extension-token *( ";" extension-param )
+ // extension-token = registered-token
+ // registered-token = token
+ // extension-param = token [ "=" (token | quoted-string) ]
+ // ; When using the quoted-string syntax variant, the value
+ // ; after quoted-string unescaping MUST conform to the
+ // ; 'token' ABNF.
+ //
+ // The limiting of parameter values to tokens or "quoted tokens" makes
+ // the parsing of the header significantly simpler and allows a number
+ // of short-cuts to be taken.
+
+ // Step one, split the header into individual extensions using ',' as a
+ // separator
+ String unparsedExtensions[] = header.split(",");
+ for (String unparsedExtension : unparsedExtensions) {
+ // Step two, split the extension into the registered name and
+ // parameter/value pairs using ';' as a separator
+ String unparsedParameters[] = unparsedExtension.split(";");
+ WsExtension extension = new WsExtension(unparsedParameters[0].trim());
+
+ for (int i = 1; i < unparsedParameters.length; i++) {
+ int equalsPos = unparsedParameters[i].indexOf('=');
+ String name;
+ String value;
+ if (equalsPos == -1) {
+ name = unparsedParameters[i].trim();
+ value = null;
+ } else {
+ name = unparsedParameters[i].substring(0, equalsPos).trim();
+ value = unparsedParameters[i].substring(equalsPos + 1).trim();
+ int len = value.length();
+ if (len > 1) {
+ if (value.charAt(0) == '\"' &&
value.charAt(len - 1) == '\"') {
+ value = value.substring(1, value.length() - 1);
+ }
+ }
+ }
+ // Make sure value doesn't contain any of the delimiters since
+ // that would indicate something went wrong
+ if (containsDelims(name) || containsDelims(value)) {
+ throw MESSAGES.invalidToken(name, value);
+ }
+ if (value != null &&
+ (value.indexOf(',') > -1 || value.indexOf(';')
> -1 ||
+ value.indexOf('\"') > -1 ||
value.indexOf('=') > -1)) {
+ throw MESSAGES.invalidTokenValue(value);
+ }
+ extension.addParameter(new WsExtensionParameter(name, value));
+ }
+ extensions.add(extension);
+ }
+ }
+
+
+ private static boolean containsDelims(String input) {
+ if (input == null || input.length() == 0) {
+ return false;
+ }
+ for (char c : input.toCharArray()) {
+ switch (c) {
+ case ',':
+ case ';':
+ case '\"':
+ case '=':
+ return true;
+ default:
+ // NO_OP
+ }
+
+ }
+ return false;
+ }
+
private static Method getOnMessageMethod(MessageHandler listener) {
try {
return listener.getClass().getMethod("onMessage", Object.class);
@@ -451,6 +532,7 @@
}
}
+
public static class DecoderMatch {
private final List<Class<? extends Decoder>> textDecoders =
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtension.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtension.java
(rev 0)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtension.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.websocket.Extension;
+
+public class WsExtension implements Extension {
+
+ private final String name;
+ private final List<Parameter> parameters = new ArrayList<Parameter>();
+
+ WsExtension(String name) {
+ this.name = name;
+ }
+
+ void addParameter(Parameter parameter) {
+ parameters.add(parameter);
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public List<Parameter> getParameters() {
+ return parameters;
+ }
+}
Added: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtensionParameter.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtensionParameter.java
(rev 0)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsExtensionParameter.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.websocket;
+
+import javax.websocket.Extension.Parameter;
+
+public class WsExtensionParameter implements Parameter {
+
+ private final String name;
+ private final String value;
+
+ WsExtensionParameter(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+}
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java 2014-09-17
07:50:46 UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameBase.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -24,9 +24,11 @@
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
+import java.util.List;
import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;
+import javax.websocket.Extension;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
@@ -43,6 +45,7 @@
// Connection level attributes
protected final WsSession wsSession;
protected final byte[] inputBuffer;
+ private final Transformation transformation;
// Attributes for control messages
// Control messages can appear in the middle of other messages so need
@@ -81,21 +84,31 @@
private int readPos = 0;
protected int writePos = 0;
- public WsFrameBase(WsSession wsSession) {
-
+ public WsFrameBase(WsSession wsSession, Transformation transformation) {
inputBuffer = new byte[Constants.DEFAULT_BUFFER_SIZE];
messageBufferBinary =
ByteBuffer.allocate(wsSession.getMaxBinaryMessageBufferSize());
messageBufferText =
CharBuffer.allocate(wsSession.getMaxTextMessageBufferSize());
this.wsSession = wsSession;
+ Transformation finalTransformation;
+ if (isMasked()) {
+ finalTransformation = new UnmaskTransformation();
+ } else {
+ finalTransformation = new NoopTransformation();
+ }
+ if (transformation == null) {
+ this.transformation = finalTransformation;
+ } else {
+ transformation.setNext(finalTransformation);
+ this.transformation = transformation;
+ }
}
protected void processInputBuffer() throws IOException {
while (true) {
wsSession.updateLastActive();
-
if (state == State.NEW_FRAME) {
if (!processInitialHeader()) {
break;
@@ -132,14 +145,13 @@
int b = inputBuffer[readPos++];
fin = (b & 0x80) > 0;
rsv = (b & 0x70) >>> 4;
- if (rsv != 0) {
- // Note extensions may use rsv bits but currently no extensions are
- // supported
+ opCode = (byte) (b & 0x0F);
+ if (!transformation.validateRsv(rsv, opCode)) {
throw new WsIOException(new CloseReason(
CloseCodes.PROTOCOL_ERROR,
MESSAGES.unsupportedReservedBitsSet(Integer.valueOf(rsv))));
}
- opCode = (byte) (b & 0x0F);
+
if (Util.isControl(opCode)) {
if (!fin) {
throw new WsIOException(new CloseReason(
@@ -155,7 +167,7 @@
}
} else {
if (continuationExpected) {
- if (opCode != Constants.OPCODE_CONTINUATION) {
+ if (!Util.isContinuation(opCode)) {
throw new WsIOException(new CloseReason(
CloseCodes.PROTOCOL_ERROR,
MESSAGES.noContinuationFrame()));
@@ -283,9 +295,13 @@
private boolean processDataControl() throws IOException {
- if (!appendPayloadToMessage(controlBufferBinary)) {
+ TransformationResult tr = transformation.getMoreData(opCode, fin, rsv,
controlBufferBinary);
+ if (TransformationResult.UNDERFLOW.equals(tr)) {
return false;
}
+ // Control messages have fixed message size so
+ // TransformationResult.OVERFLOW is not possible here
+
controlBufferBinary.flip();
if (opCode == Constants.OPCODE_CLOSE) {
open = false;
@@ -379,7 +395,8 @@
private boolean processDataText() throws IOException {
// Copy the available data to the buffer
- while (!appendPayloadToMessage(messageBufferBinary)) {
+ TransformationResult tr = transformation.getMoreData(opCode, fin, rsv,
messageBufferBinary);
+ while (!TransformationResult.END_OF_FRAME.equals(tr)) {
// Frame not complete - we ran out of something
// Convert bytes to UTF-8
messageBufferBinary.flip();
@@ -402,21 +419,24 @@
MESSAGES.textMessageTooLarge()));
}
} else if (cr.isUnderflow()) {
- // Need more input
// Compact what we have to create as much space as possible
messageBufferBinary.compact();
+ // Need more input
// What did we run out of?
- if (readPos == writePos) {
- // Ran out of input data - get some more
- return false;
- } else {
+ if (TransformationResult.OVERFLOW.equals(tr)) {
// Ran out of message buffer - exit inner loop and
// refill
break;
+ } else {
+ // TransformationResult.UNDERFLOW
+ // Ran out of input data - get some more
+ return false;
}
}
}
+ // Read more input data
+ tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
}
messageBufferBinary.flip();
@@ -474,27 +494,30 @@
private boolean processDataBinary() throws IOException {
// Copy the available data to the buffer
- while (!appendPayloadToMessage(messageBufferBinary)) {
+ TransformationResult tr = transformation.getMoreData(opCode, fin, rsv,
messageBufferBinary);
+ while (!TransformationResult.END_OF_FRAME.equals(tr)) {
// Frame not complete - what did we run out of?
- if (readPos == writePos) {
+ if (TransformationResult.UNDERFLOW.equals(tr)) {
// Ran out of input data - get some more
return false;
- } else {
- // Ran out of message buffer - flush it
- if (!usePartial()) {
- CloseReason cr = new CloseReason(CloseCodes.TOO_BIG,
+ }
+
+ // Ran out of message buffer - flush it
+ if (!usePartial()) {
+ CloseReason cr = new CloseReason(CloseCodes.TOO_BIG,
MESSAGES.bufferTooSmall(Integer.valueOf(messageBufferBinary.capacity()),
- Long.valueOf(payloadLength)));
- throw new WsIOException(cr);
- }
- messageBufferBinary.flip();
- ByteBuffer copy =
- ByteBuffer.allocate(messageBufferBinary.limit());
- copy.put(messageBufferBinary);
- copy.flip();
- sendMessageBinary(copy, false);
- messageBufferBinary.clear();
+ Long.valueOf(payloadLength)));
+ throw new WsIOException(cr);
}
+ messageBufferBinary.flip();
+ ByteBuffer copy =
+ ByteBuffer.allocate(messageBufferBinary.limit());
+ copy.put(messageBufferBinary);
+ copy.flip();
+ sendMessageBinary(copy, false);
+ messageBufferBinary.clear();
+ // Read more data
+ tr = transformation.getMoreData(opCode, fin, rsv, messageBufferBinary);
}
// Frame is fully received
@@ -621,34 +644,6 @@
}
- private boolean appendPayloadToMessage(ByteBuffer dest) {
- if (isMasked()) {
- while (payloadWritten < payloadLength && readPos < writePos
&&
- dest.hasRemaining()) {
- byte b = (byte) ((inputBuffer[readPos] ^ mask[maskIndex]) & 0xFF);
- maskIndex++;
- if (maskIndex == 4) {
- maskIndex = 0;
- }
- readPos++;
- payloadWritten++;
- dest.put(b);
- }
- return (payloadWritten == payloadLength);
- } else {
- long toWrite = Math.min(
- payloadLength - payloadWritten, writePos - readPos);
- toWrite = Math.min(toWrite, dest.remaining());
-
- dest.put(inputBuffer, readPos, (int) toWrite);
- readPos += toWrite;
- payloadWritten += toWrite;
- return (payloadWritten == payloadLength);
-
- }
- }
-
-
private boolean swallowInput() {
long toSkip = Math.min(payloadLength - payloadWritten, writePos - readPos);
readPos += toSkip;
@@ -686,7 +681,125 @@
}
+ protected Transformation getTransformation() {
+ return transformation;
+ }
+
+
private static enum State {
NEW_FRAME, PARTIAL_HEADER, DATA
}
+
+
+ private abstract class TerminalTransformation implements Transformation {
+
+ @Override
+ public boolean validateRsvBits(int i) {
+ // Terminal transformations don't use RSV bits and there is no next
+ // transformation so always return true.
+ return true;
+ }
+
+ @Override
+ public Extension getExtensionResponse() {
+ // Return null since terminal transformations are not extensions
+ return null;
+ }
+
+ @Override
+ public void setNext(Transformation t) {
+ // NO-OP since this is the terminal transformation
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * Anything other than a value of zero for rsv is invalid.
+ */
+ @Override
+ public boolean validateRsv(int rsv, byte opCode) {
+ return rsv == 0;
+ }
+ }
+
+
+ /**
+ * For use by the client implementation that needs to obtain payload data
+ * without the need for unmasking.
+ */
+ private final class NoopTransformation extends TerminalTransformation {
+
+ @Override
+ public TransformationResult getMoreData(byte opCode, boolean fin, int rsv,
+ ByteBuffer dest) {
+ // opCode is ignored as the transformation is the same for all
+ // opCodes
+ // rsv is ignored as it known to be zero at this point
+ long toWrite = Math.min(
+ payloadLength - payloadWritten, writePos - readPos);
+ toWrite = Math.min(toWrite, dest.remaining());
+
+ dest.put(inputBuffer, readPos, (int) toWrite);
+ readPos += toWrite;
+ payloadWritten += toWrite;
+
+ if (payloadWritten == payloadLength) {
+ return TransformationResult.END_OF_FRAME;
+ } else if (readPos == writePos) {
+ return TransformationResult.UNDERFLOW;
+ } else {
+ // !dest.hasRemaining()
+ return TransformationResult.OVERFLOW;
+ }
+ }
+
+
+ @Override
+ public List<MessagePart> sendMessagePart(List<MessagePart>
messageParts) {
+ // TODO Masking should move to this method
+ // NO-OP send so simply return the message unchanged.
+ return messageParts;
+ }
+ }
+
+
+ /**
+ * For use by the server implementation that needs to obtain payload data
+ * and unmask it before any further processing.
+ */
+ private final class UnmaskTransformation extends TerminalTransformation {
+
+ @Override
+ public TransformationResult getMoreData(byte opCode, boolean fin, int rsv,
+ ByteBuffer dest) {
+ // opCode is ignored as the transformation is the same for all
+ // opCodes
+ // rsv is ignored as it known to be zero at this point
+ while (payloadWritten < payloadLength && readPos < writePos
&&
+ dest.hasRemaining()) {
+ byte b = (byte) ((inputBuffer[readPos] ^ mask[maskIndex]) & 0xFF);
+ maskIndex++;
+ if (maskIndex == 4) {
+ maskIndex = 0;
+ }
+ readPos++;
+ payloadWritten++;
+ dest.put(b);
+ }
+ if (payloadWritten == payloadLength) {
+ return TransformationResult.END_OF_FRAME;
+ } else if (readPos == writePos) {
+ return TransformationResult.UNDERFLOW;
+ } else {
+ // !dest.hasRemaining()
+ return TransformationResult.OVERFLOW;
+ }
+ }
+
+ @Override
+ public List<MessagePart> sendMessagePart(List<MessagePart>
messageParts) {
+ // NO-OP send so simply return the message unchanged.
+ return messageParts;
+ }
+ }
}
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameClient.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameClient.java 2014-09-17
07:50:46 UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsFrameClient.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -32,11 +32,15 @@
public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel,
WsSession wsSession) {
- super(wsSession);
+ // TODO Add support for extensions to the client side code
+ super(wsSession, null);
this.response = response;
this.channel = channel;
this.handler = new WsFrameClientCompletionHandler();
+ }
+
+ void startInputProcessing() {
try {
processSocketRead();
} catch (IOException e) {
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -56,6 +56,10 @@
private final StateMachine stateMachine = new StateMachine();
+ private final IntermediateMessageHandler intermediateMessageHandler =
+ new IntermediateMessageHandler(this);
+
+ private Transformation transformation = null;
private boolean messagePartInProgress = false;
private final Queue<MessagePart> messagePartQueue = new
ArrayDeque<MessagePart>();
private final Object messagePartLock = new Object();
@@ -77,6 +81,12 @@
private WsSession wsSession;
private List<EncoderEntry> encoderEntries = new
ArrayList<EncoderEntry>();
+
+ protected void setTransformation(Transformation transformation) {
+ this.transformation = transformation;
+ }
+
+
public long getSendTimeout() {
return sendTimeout;
}
@@ -232,6 +242,7 @@
} else {
f2sh.get(timeout, TimeUnit.MILLISECONDS);
}
+ // FIXME: maybe not needed
if (payload != null) {
payload.clear();
}
@@ -250,8 +261,23 @@
wsSession.updateLastActive();
- MessagePart mp = new MessagePart(opCode, payload, last, handler, this);
+ List<MessagePart> messageParts = new ArrayList<MessagePart>();
+ messageParts.add(new MessagePart(last, 0, opCode, payload,
+ intermediateMessageHandler,
+ new EndMessageHandler(this, handler)));
+ messageParts = transformation.sendMessagePart(messageParts);
+
+ // Some extensions/transformations may buffer messages so it is possible
+ // that no message parts will be returned. If this is the case the
+ // trigger the suppler SendHandler
+ if (messageParts.size() == 0) {
+ handler.onResult(new SendResult());
+ return;
+ }
+
+ MessagePart mp = messageParts.remove(0);
+
boolean doWrite = false;
synchronized (messagePartLock) {
if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
@@ -276,6 +302,8 @@
messagePartInProgress = true;
doWrite = true;
}
+ // Add any remaining messages to the queue
+ messagePartQueue.addAll(messageParts);
}
if (doWrite) {
// Actual write has to be outside sync block to avoid possible
@@ -314,12 +342,15 @@
wsSession.updateLastActive();
- handler.onResult(result);
+ // Some handlers, such as the IntermediateMessageHandler, do not have a
+ // nested handler so handler may be null.
+ if (handler != null) {
+ handler.onResult(result);
+ }
}
void writeMessagePart(MessagePart mp) {
-
if (closed) {
throw MESSAGES.messageSessionClosed();
}
@@ -327,7 +358,7 @@
if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
nextFragmented = fragmented;
nextText = text;
- doWrite(mp.getHandler(), outputBuffer);
+ doWrite(mp.getEndHandler(), outputBuffer);
return;
}
@@ -350,11 +381,11 @@
throw MESSAGES.messageFragmentTypeChange();
}
nextText = text;
- nextFragmented = !mp.isLast();
+ nextFragmented = !mp.isFin();
first = false;
} else {
// Wasn't fragmented. Might be now
- if (mp.isLast()) {
+ if (mp.isFin()) {
nextFragmented = false;
} else {
nextFragmented = true;
@@ -373,21 +404,20 @@
}
headerBuffer.clear();
- writeHeader(headerBuffer, mp.getOpCode(), mp.getPayload(), first,
- mp.isLast(), isMasked(), mask);
+ writeHeader(headerBuffer, mp.isFin(), mp.getRsv(), mp.getOpCode(),
+ isMasked(), mp.getPayload(), mask, first);
headerBuffer.flip();
if (getBatchingAllowed() || isMasked()) {
// Need to write via output buffer
OutputBufferSendHandler obsh = new OutputBufferSendHandler(
- mp.getHandler(), headerBuffer, mp.getPayload(), mask,
+ mp.getEndHandler(), headerBuffer, mp.getPayload(), mask,
outputBuffer, !getBatchingAllowed(), this);
obsh.write();
} else {
// Can write directly
- doWrite(mp.getHandler(), headerBuffer, mp.getPayload());
+ doWrite(mp.getEndHandler(), headerBuffer, mp.getPayload());
}
-
}
@@ -406,61 +436,50 @@
}
- private static class MessagePart {
- private final byte opCode;
- private final ByteBuffer payload;
- private final boolean last;
+ /**
+ * Wraps the user provided handler so that the end point is notified when
+ * the message is complete.
+ */
+ private static class EndMessageHandler implements SendHandler {
+
+ private final WsRemoteEndpointImplBase endpoint;
private final SendHandler handler;
- public MessagePart(byte opCode, ByteBuffer payload, boolean last,
- SendHandler handler, WsRemoteEndpointImplBase endpoint) {
- this.opCode = opCode;
- this.payload = payload;
- this.last = last;
- this.handler = new EndMessageHandler(endpoint, handler);
+ public EndMessageHandler(WsRemoteEndpointImplBase endpoint,
+ SendHandler handler) {
+ this.endpoint = endpoint;
+ this.handler = handler;
}
- public byte getOpCode() {
- return opCode;
+ @Override
+ public void onResult(SendResult result) {
+ endpoint.endMessage(handler, result);
}
-
-
- public ByteBuffer getPayload() {
- return payload;
- }
-
-
- public boolean isLast() {
- return last;
- }
-
-
- public SendHandler getHandler() {
- return handler;
- }
}
/**
- * Wraps the user provided handler so that the end point is notified when
- * the message is complete.
+ * If a transformation needs to split a {@link MessagePart} into multiple
+ * {@link MessagePart}s, it uses this handler as the end handler for each of
+ * the additional {@link MessagePart}s. This handler notifies this this
+ * class that the {@link MessagePart} has been processed and that the next
+ * {@link MessagePart} in the queue should be started. The final
+ * {@link MessagePart} will use the {@link EndMessageHandler} provided with
+ * the original {@link MessagePart}.
*/
- private static class EndMessageHandler implements SendHandler {
+ private static class IntermediateMessageHandler implements SendHandler {
private final WsRemoteEndpointImplBase endpoint;
- private final SendHandler handler;
- public EndMessageHandler(WsRemoteEndpointImplBase endpoint,
- SendHandler handler) {
+ public IntermediateMessageHandler(WsRemoteEndpointImplBase endpoint) {
this.endpoint = endpoint;
- this.handler = handler;
}
@Override
public void onResult(SendResult result) {
- endpoint.endMessage(handler, result);
+ endpoint.endMessage(null, result);
}
}
@@ -594,20 +613,22 @@
protected abstract boolean isMasked();
protected abstract void doClose();
- private static void writeHeader(ByteBuffer headerBuffer, byte opCode,
- ByteBuffer payload, boolean first, boolean last, boolean masked,
- byte[] mask) {
+ private static void writeHeader(ByteBuffer headerBuffer, boolean fin,
+ int rsv, byte opCode, boolean masked, ByteBuffer payload,
+ byte[] mask, boolean first) {
byte b = 0;
- if (last) {
+ if (fin) {
// Set the fin bit
- b = -128;
+ b -= 128;
}
+ b += (rsv << 4);
+
if (first) {
// This is the first fragment of this message
- b = (byte) (b + opCode);
+ b += opCode;
}
// If not the first fragment, it is a continuation with opCode of zero
@@ -669,6 +690,7 @@
}
public void write() {
+ // FIXME: maybe not needed
synchronized (buffer) {
buffer.clear();
CoderResult cr = encoder.encode(message, buffer, true);
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplClient.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -41,6 +41,7 @@
long timeout = getSendTimeout();
if (timeout < 1) {
timeout = Long.MAX_VALUE;
+
}
SendHandlerToCompletionHandler sh2ch =
new SendHandlerToCompletionHandler(handler);
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2014-09-17
07:50:46 UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -39,6 +39,8 @@
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.MessageHandler;
+import javax.websocket.MessageHandler.Partial;
+import javax.websocket.MessageHandler.Whole;
import javax.websocket.PongMessage;
import javax.websocket.RemoteEndpoint;
import javax.websocket.SendResult;
@@ -72,6 +74,7 @@
private final Principal userPrincipal;
private final EndpointConfig endpointConfig;
+ private final List<Extension> negotiatedExtensions;
private final String subProtocol;
private final Map<String,String> pathParameters;
private final boolean secure;
@@ -103,6 +106,7 @@
*
* @param localEndpoint
* @param wsRemoteEndpoint
+ * @param negotiatedExtensions
* @throws DeploymentException
*/
public WsSession(Endpoint localEndpoint,
@@ -110,9 +114,8 @@
WsWebSocketContainer wsWebSocketContainer,
URI requestUri, Map<String,List<String>> requestParameterMap,
String queryString, Principal userPrincipal, String httpSessionId,
- String subProtocol, Map<String,String> pathParameters,
- boolean secure, EndpointConfig endpointConfig)
- throws DeploymentException {
+ List<Extension> negotiatedExtensions, String subProtocol,
Map<String,String> pathParameters,
+ boolean secure, EndpointConfig endpointConfig) throws DeploymentException {
this.localEndpoint = localEndpoint;
this.wsRemoteEndpoint = wsRemoteEndpoint;
this.wsRemoteEndpoint.setSession(this);
@@ -137,6 +140,7 @@
this.queryString = queryString;
this.userPrincipal = userPrincipal;
this.httpSessionId = httpSessionId;
+ this.negotiatedExtensions = negotiatedExtensions;
if (subProtocol == null) {
this.subProtocol = "";
} else {
@@ -159,10 +163,29 @@
}
- @SuppressWarnings("unchecked")
@Override
public void addMessageHandler(MessageHandler listener) {
+ Class<?> target = Util.getMessageType(listener);
+ doAddMessageHandler(target, listener);
+ }
+
+ @Override
+ public <T> void addMessageHandler(Class<T> clazz, Partial<T>
handler)
+ throws IllegalStateException {
+ doAddMessageHandler(clazz, handler);
+ }
+
+
+ @Override
+ public <T> void addMessageHandler(Class<T> clazz, Whole<T>
handler)
+ throws IllegalStateException {
+ doAddMessageHandler(clazz, handler);
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private void doAddMessageHandler(Class<?> target, MessageHandler listener) {
checkState();
// Message handlers that require decoders may map to text messages,
@@ -176,7 +199,7 @@
// just as easily.
Set<MessageHandlerResult> mhResults =
- Util.getMessageHandlers(listener, endpointConfig, this);
+ Util.getMessageHandlers(target, listener, endpointConfig, this);
for (MessageHandlerResult mhResult : mhResults) {
switch (mhResult.getType()) {
@@ -295,7 +318,7 @@
@Override
public List<Extension> getNegotiatedExtensions() {
checkState();
- return Collections.emptyList();
+ return negotiatedExtensions;
}
@@ -449,6 +472,7 @@
}
}
+
private void fireEndpointOnClose(CloseReason closeReason) {
// Fire the onClose event
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsWebSocketContainer.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -326,21 +326,24 @@
}
// Switch to WebSocket
- WsRemoteEndpointImplClient wsRemoteEndpointClient =
- new WsRemoteEndpointImplClient(channel);
+ WsRemoteEndpointImplClient wsRemoteEndpointClient = new
WsRemoteEndpointImplClient(channel);
WsSession wsSession = new WsSession(endpoint, wsRemoteEndpointClient,
- this, null, null, null, null, null, subProtocol,
- Collections.<String, String> emptyMap(), secure,
+ this, null, null, null, null, null,
Collections.<Extension>emptyList(),
+ subProtocol, Collections.<String,String>emptyMap(), secure,
clientEndpointConfiguration);
- endpoint.onOpen(wsSession, clientEndpointConfiguration);
- registerSession(endpoint, wsSession);
- // Object creation will trigger input processing
- @SuppressWarnings("unused")
WsFrameClient wsFrameClient = new WsFrameClient(response, channel,
wsSession);
+ // WsFrame adds the necessary final transformations. Copy the
+ // completed transformation chain to the remote end point.
+ wsRemoteEndpointClient.setTransformation(wsFrameClient.getTransformation());
+ endpoint.onOpen(wsSession, clientEndpointConfiguration);
+ registerSession(endpoint, wsSession);
+
+ wsFrameClient.startInputProcessing();
+
return wsSession;
}
@@ -533,6 +536,7 @@
* @throws DeploymentException
* @throws TimeoutException
*/
+ @SuppressWarnings("null") // line is not null in line.endsWith() call
private HandshakeResponse processResponse(ByteBuffer response,
AsyncChannelWrapper channel, long timeout) throws InterruptedException,
ExecutionException, DeploymentException, EOFException,
@@ -644,7 +648,7 @@
if (sslTrustStorePwdValue == null) {
sslTrustStorePwdValue = SSL_TRUSTSTORE_PWD_DEFAULT;
}
-
+
File keyStoreFile = new File(sslTrustStoreValue);
KeyStore ks = KeyStore.getInstance("JKS");
InputStream is = null;
@@ -656,7 +660,7 @@
try {
is.close();
} catch (IOException ioe) {
- // Ignore
+ // Ignore
}
}
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerBase.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerBase.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerBase.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -25,6 +25,7 @@
import javax.websocket.RemoteEndpoint;
import javax.websocket.Session;
+import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.websocket.WrappedMessageHandler;
/**
@@ -49,6 +50,8 @@
int indexSession, long maxMessageSize) {
this.pojo = pojo;
this.method = method;
+ // TODO: The method should already be accessible here but the following
+ // code seems to be necessary in some as yet not fully understood cases.
try {
this.method.setAccessible(true);
} catch (Exception e) {
@@ -107,4 +110,15 @@
public final long getMaxMessageSize() {
return maxMessageSize;
}
+
+
+ protected final void handlePojoMethodException(Throwable t) {
+ t = ExceptionUtils.unwrapInvocationTargetException(t);
+ ExceptionUtils.handleThrowable(t);
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerPartialBase.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerPartialBase.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerPartialBase.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -67,7 +67,7 @@
} else {
parameters[indexPayload] = message;
}
- Object result;
+ Object result = null;
ThreadBindingListener tbl = ((WsSession) session).getThreadBindingListener();
ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
@@ -75,9 +75,9 @@
tbl.bind();
result = method.invoke(pojo, parameters);
} catch (IllegalAccessException e) {
- throw new IllegalArgumentException(e);
+ handlePojoMethodException(e);
} catch (InvocationTargetException e) {
- throw new IllegalArgumentException(e);
+ handlePojoMethodException(e);
} finally {
try {
tbl.unbind();
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBase.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBase.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/pojo/PojoMessageHandlerWholeBase.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -76,7 +76,7 @@
}
parameters[indexPayload] = payload;
- Object result;
+ Object result = null;
ThreadBindingListener tbl = ((WsSession) session).getThreadBindingListener();
ClassLoader old = Thread.currentThread().getContextClassLoader();
try {
@@ -84,9 +84,9 @@
tbl.bind();
result = method.invoke(pojo, parameters);
} catch (IllegalAccessException e) {
- throw new IllegalArgumentException(e);
+ handlePojoMethodException(e);
} catch (InvocationTargetException e) {
- throw new IllegalArgumentException(e);
+ handlePojoMethodException(e);
} finally {
try {
tbl.unbind();
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/DefaultServerEndpointConfigurator.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/DefaultServerEndpointConfigurator.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/DefaultServerEndpointConfigurator.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -17,7 +17,9 @@
package org.apache.tomcat.websocket.server;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import javax.websocket.Extension;
import javax.websocket.HandshakeResponse;
@@ -56,10 +58,13 @@
@Override
public List<Extension> getNegotiatedExtensions(List<Extension>
installed,
List<Extension> requested) {
-
+ Set<String> installedNames = new HashSet<String>();
+ for (Extension e : installed) {
+ installedNames.add(e.getName());
+ }
List<Extension> result = new ArrayList<Extension>();
for (Extension request : requested) {
- if (installed.contains(request)) {
+ if (installedNames.contains(request.getName())) {
result.add(request);
}
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/UpgradeUtil.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/UpgradeUtil.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/UpgradeUtil.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -20,17 +20,13 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
@@ -45,7 +41,11 @@
import org.apache.catalina.connector.RequestFacade;
import org.apache.tomcat.util.codec.binary.Base64;
+import org.apache.tomcat.util.security.ConcurrentMessageDigest;
import org.apache.tomcat.websocket.Constants;
+import org.apache.tomcat.websocket.Transformation;
+import org.apache.tomcat.websocket.TransformationFactory;
+import org.apache.tomcat.websocket.Util;
import org.apache.tomcat.websocket.WsHandshakeResponse;
import org.apache.tomcat.websocket.pojo.PojoEndpointServer;
@@ -54,8 +54,6 @@
private static final byte[] WS_ACCEPT =
"258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(
StandardCharsets.ISO_8859_1);
- private static final Queue<MessageDigest> sha1Helpers =
- new ConcurrentLinkedQueue<MessageDigest>();
private UpgradeUtil() {
// Utility class. Hide default constructor.
@@ -90,7 +88,6 @@
// validation fails
String key;
String subProtocol = null;
- List<Extension> extensions = Collections.emptyList();
if (!headerContainsToken(req, Constants.CONNECTION_HEADER_NAME,
Constants.CONNECTION_HEADER_VALUE)) {
resp.sendError(HttpServletResponse.SC_BAD_REQUEST);
@@ -123,8 +120,59 @@
sec.getSubprotocols(), subProtocols);
// Extensions
- // Currently no extensions are supported by this implementation
+ // Should normally only be one header but handle the case of multiple
+ // headers
+ List<Extension> extensionsRequested = new ArrayList<Extension>();
+ Enumeration<String> extHeaders =
req.getHeaders("Sec-WebSocket-Extensions");
+ while (extHeaders.hasMoreElements()) {
+ Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());
+ }
+ // Negotiation phase 1. By default this simply filters out the
+ // extensions that the server does not support but applications could
+ // use a custom configurator to do more than this.
+ List<Extension> negotiatedExtensionsPhase1 =
sec.getConfigurator().getNegotiatedExtensions(
+ Constants.INSTALLED_EXTENSIONS, extensionsRequested);
+ // Negotiation phase 2. Create the Transformations that will be applied
+ // to this connection. Note than an extension may be dropped at this
+ // point if the client has requested a configuration that the server is
+ // unable to support.
+ List<Transformation> transformations =
createTransformations(negotiatedExtensionsPhase1);
+
+ List<Extension> negotiatedExtensionsPhase2;
+ if (transformations.isEmpty()) {
+ negotiatedExtensionsPhase2 = Collections.emptyList();
+ } else {
+ negotiatedExtensionsPhase2 = new
ArrayList<Extension>(transformations.size());
+ for (Transformation t : transformations) {
+ negotiatedExtensionsPhase2.add(t.getExtensionResponse());
+ }
+ }
+
+ // Build the transformation pipeline
+ Transformation transformation = null;
+ StringBuilder responseHeaderExtensions = new StringBuilder();
+ boolean first = true;
+ for (Transformation t : transformations) {
+ if (first) {
+ first = false;
+ } else {
+ responseHeaderExtensions.append(',');
+ }
+ append(responseHeaderExtensions, t.getExtensionResponse());
+ if (transformation == null) {
+ transformation = t;
+ } else {
+ transformation.setNext(t);
+ }
+ }
+
+ // Now we have the full pipeline, validate the use of the RSV bits.
+ if (transformation != null && !transformation.validateRsvBits(0)) {
+ // TODO i18n
+ throw new ServletException("Incompatible RSV bit usage");
+ }
+
// If we got this far, all is good. Accept the connection.
resp.setHeader(Constants.UPGRADE_HEADER_NAME,
Constants.UPGRADE_HEADER_VALUE);
@@ -136,16 +184,8 @@
// RFC6455 4.2.2 explicitly states "" is not valid here
resp.setHeader("Sec-WebSocket-Protocol", subProtocol);
}
- if (!extensions.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- Iterator<Extension> iter = extensions.iterator();
- // There must be at least one
- sb.append(iter.next());
- while (iter.hasNext()) {
- sb.append(',');
- sb.append(iter.next().getName());
- }
- resp.setHeader("Sec-WebSocket-Extensions", sb.toString());
+ if (!transformations.isEmpty()) {
+ resp.setHeader("Sec-WebSocket-Extensions",
responseHeaderExtensions.toString());
}
WsHandshakeRequest wsRequest = new WsHandshakeRequest(req);
@@ -187,13 +227,65 @@
WsHttpUpgradeHandler wsHandler =
((RequestFacade) inner).upgrade(WsHttpUpgradeHandler.class);
wsHandler.preInit(ep, perSessionServerEndpointConfig, sc, wsRequest,
- subProtocol, pathParams, req.isSecure());
+ negotiatedExtensionsPhase2, subProtocol, transformation, pathParams,
+ req.isSecure());
} else {
throw new ServletException(MESSAGES.upgradeFailed());
}
}
+ private static List<Transformation> createTransformations(
+ List<Extension> negotiatedExtensions) {
+
+ TransformationFactory factory = TransformationFactory.getInstance();
+
+ LinkedHashMap<String,List<List<Extension.Parameter>>>
extensionPreferences =
+ new
LinkedHashMap<String,List<List<Extension.Parameter>>>();
+
+ // Result will likely be smaller than this
+ List<Transformation> result = new
ArrayList<Transformation>(negotiatedExtensions.size());
+
+ for (Extension extension : negotiatedExtensions) {
+ List<List<Extension.Parameter>> preferences =
+ extensionPreferences.get(extension.getName());
+
+ if (preferences == null) {
+ preferences = new ArrayList<List<Extension.Parameter>>();
+ extensionPreferences.put(extension.getName(), preferences);
+ }
+
+ preferences.add(extension.getParameters());
+ }
+
+ for (Map.Entry<String,List<List<Extension.Parameter>>> entry :
+ extensionPreferences.entrySet()) {
+ Transformation transformation = factory.create(entry.getKey(),
entry.getValue());
+ if (transformation != null) {
+ result.add(transformation);
+ }
+ }
+ return result;
+ }
+
+ private static void append(StringBuilder sb, Extension extension) {
+ if (extension == null || extension.getName() == null ||
extension.getName().length() == 0) {
+ return;
+ }
+
+ sb.append(extension.getName());
+
+ for (Extension.Parameter p : extension.getParameters()) {
+ sb.append(';');
+ sb.append(p.getName());
+ if (p.getValue() != null) {
+ sb.append('=');
+ sb.append(p.getValue());
+ }
+ }
+ }
+
+
/*
* This only works for tokens. Quoted strings need more sophisticated
* parsing.
@@ -233,19 +325,9 @@
}
- private static String getWebSocketAccept(String key) throws ServletException {
- MessageDigest sha1Helper = sha1Helpers.poll();
- if (sha1Helper == null) {
- try {
- sha1Helper = MessageDigest.getInstance("SHA1");
- } catch (NoSuchAlgorithmException e) {
- throw new ServletException(e);
- }
- }
- sha1Helper.reset();
- sha1Helper.update(key.getBytes(StandardCharsets.ISO_8859_1));
- String result = Base64.encodeBase64String(sha1Helper.digest(WS_ACCEPT));
- sha1Helpers.add(sha1Helper);
- return result;
+ private static String getWebSocketAccept(String key) {
+ byte[] digest = ConcurrentMessageDigest.digestSHA1(
+ key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
+ return Base64.encodeBase64String(digest);
}
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsFrameServer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsFrameServer.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsFrameServer.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -20,6 +20,7 @@
import java.io.IOException;
import org.apache.coyote.http11.upgrade.AbstractServletInputStream;
+import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsFrameBase;
import org.apache.tomcat.websocket.WsSession;
@@ -29,8 +30,9 @@
private final Object connectionReadLock = new Object();
- public WsFrameServer(AbstractServletInputStream sis, WsSession wsSession) {
- super(wsSession);
+ public WsFrameServer(AbstractServletInputStream sis, WsSession wsSession,
+ Transformation transformation) {
+ super(wsSession, transformation);
this.sis = sis;
}
@@ -62,4 +64,11 @@
// Data is from the client so it should be masked
return true;
}
+
+
+ @Override
+ protected Transformation getTransformation() {
+ // Overridden to make it visible to other classes in this package
+ return super.getTransformation();
+ }
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsHttpUpgradeHandler.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -20,6 +20,7 @@
import java.io.EOFException;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpSession;
@@ -28,6 +29,7 @@
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
+import javax.websocket.Extension;
import org.apache.coyote.http11.upgrade.AbstractServletInputStream;
import org.apache.coyote.http11.upgrade.AbstractServletOutputStream;
@@ -35,6 +37,7 @@
import org.apache.coyote.http11.upgrade.servlet31.ReadListener;
import org.apache.coyote.http11.upgrade.servlet31.WebConnection;
import org.apache.coyote.http11.upgrade.servlet31.WriteListener;
+import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsIOException;
import org.apache.tomcat.websocket.WsSession;
import org.jboss.web.WebsocketsLogger;
@@ -50,7 +53,9 @@
private EndpointConfig endpointConfig;
private WsServerContainer webSocketContainer;
private WsHandshakeRequest handshakeRequest;
+ private List<Extension> negotiatedExtensions;
private String subProtocol;
+ private Transformation transformation;
private Map<String,String> pathParameters;
private boolean secure;
private WebConnection connection;
@@ -65,13 +70,16 @@
public void preInit(Endpoint ep, EndpointConfig endpointConfig,
WsServerContainer wsc, WsHandshakeRequest handshakeRequest,
- String subProtocol, Map<String,String> pathParameters,
+ List<Extension> negotiatedExtensionsPhase2, String subProtocol,
+ Transformation transformation, Map<String,String> pathParameters,
boolean secure) {
this.ep = ep;
this.endpointConfig = endpointConfig;
this.webSocketContainer = wsc;
this.handshakeRequest = handshakeRequest;
+ this.negotiatedExtensions = negotiatedExtensionsPhase2;
this.subProtocol = subProtocol;
+ this.transformation = transformation;
this.pathParameters = pathParameters;
this.secure = secure;
}
@@ -100,6 +108,12 @@
httpSessionId = ((HttpSession) session).getId();
}
+ // Need to call onOpen using the web application's class loader
+ // Create the frame using the application's class loader so it can pick
+ // up application specific config from the ServerContainerImpl
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
try {
WsRemoteEndpointImplServer wsRemoteEndpointServer =
new WsRemoteEndpointImplServer(sos, webSocketContainer);
@@ -108,17 +122,20 @@
handshakeRequest.getParameterMap(),
handshakeRequest.getQueryString(),
handshakeRequest.getUserPrincipal(), httpSessionId,
- subProtocol, pathParameters, secure, endpointConfig);
- WsFrameServer wsFrame = new WsFrameServer(
- sis,
- wsSession);
- sos.setWriteListener(
- new WsWriteListener(this, wsRemoteEndpointServer));
+ negotiatedExtensions, subProtocol, pathParameters, secure,
+ endpointConfig);
+ WsFrameServer wsFrame = new WsFrameServer(sis, wsSession, transformation);
+ sos.setWriteListener(new WsWriteListener(this, wsRemoteEndpointServer));
+ // WsFrame adds the necessary final transformations. Copy the
+ // completed transformation chain to the remote end point.
+ wsRemoteEndpointServer.setTransformation(wsFrame.getTransformation());
ep.onOpen(wsSession, endpointConfig);
webSocketContainer.registerSession(ep, wsSession);
sis.setReadListener(new WsReadListener(this, wsFrame));
} catch (DeploymentException e) {
throw new IllegalArgumentException(e);
+ } finally {
+ t.setContextClassLoader(cl);
}
}
@@ -136,7 +153,15 @@
private void onError(Throwable throwable) {
- ep.onError(wsSession, throwable);
+ // Need to call onError using the web application's class loader
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+ try {
+ ep.onError(wsSession, throwable);
+ } finally {
+ t.setContextClassLoader(cl);
+ }
}
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2014-09-17
07:50:46 UTC (rev 2510)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/server/WsRemoteEndpointImplServer.java 2014-09-29
15:13:58 UTC (rev 2511)
@@ -28,6 +28,7 @@
import javax.websocket.SendResult;
import org.apache.coyote.http11.upgrade.AbstractServletOutputStream;
+import org.apache.tomcat.websocket.Transformation;
import org.apache.tomcat.websocket.WsRemoteEndpointImplBase;
import org.jboss.web.WebsocketsLogger;
@@ -87,6 +88,7 @@
while (sos.isReady()) {
complete = true;
for (ByteBuffer buffer : buffers) {
+ // FIXME: might not be needed
synchronized (buffer) {
if (buffer.hasRemaining()) {
complete = false;
@@ -116,6 +118,7 @@
}
if (!complete) {
// Async write is in progress
+
long timeout = getSendTimeout();
if (timeout > 0) {
// Register with timeout thread
@@ -130,7 +133,7 @@
protected void doClose() {
if (handler != null) {
// close() can be triggered by a wide range of scenarios. It is far
- // simpler just to always use a dispatch that it is to try and track
+ // simpler just to always use a dispatch than it is to try and track
// whether or not this method was called by the same thread that
// triggered the write
clearHandler(new EOFException(), true);
@@ -164,6 +167,13 @@
}
+ @Override
+ protected void setTransformation(Transformation transformation) {
+ // Overridden purely so it is visible to other classes in this package
+ super.setTransformation(transformation);
+ }
+
+
/**
*
* @param t The throwable associated with any error that
@@ -181,6 +191,7 @@
// message.
SendHandler sh = handler;
handler = null;
+ buffers = null;
if (sh != null) {
if (useDispatch) {
OnResultRunnable r = onResultRunnables.poll();
Modified: branches/7.5.x/src/main/java/org/jboss/web/CoyoteMessages.java
===================================================================
--- branches/7.5.x/src/main/java/org/jboss/web/CoyoteMessages.java 2014-09-17 07:50:46 UTC
(rev 2510)
+++ branches/7.5.x/src/main/java/org/jboss/web/CoyoteMessages.java 2014-09-29 15:13:58 UTC
(rev 2511)
@@ -289,4 +289,7 @@
@Message(id = 2083, value = "Maximum extension size [%s] exceeded for this
request")
IOException maxExtensionSizeExceeded(int size);
+ @Message(id = 2084, value = "Must call init first")
+ IllegalStateException mustInitFirst();
+
}
Modified: branches/7.5.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.5.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2014-09-17 07:50:46
UTC (rev 2510)
+++ branches/7.5.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2014-09-29 15:13:58
UTC (rev 2511)
@@ -311,4 +311,25 @@
@Message(id = 8591, value = "Unknown scheme %s")
IllegalArgumentException unknownScheme(String scheme);
+ @Message(id = 8592, value = "Duplicate deflate parameter %s")
+ IllegalArgumentException duplicateDeflateParameter(String parameter);
+
+ @Message(id = 8593, value = "Invalid deflate window size %s maximum %s")
+ IllegalArgumentException invalidDeflateWindowSize(String parameter, int maximum);
+
+ @Message(id = 8594, value = "Unknown deflate parameter %s")
+ IllegalArgumentException unkownDeflateParameter(String parameter);
+
+ @Message(id = 8595, value = "Deflate failure")
+ String deflateFailure();
+
+ @Message(id = 8596, value = "Unsupported extension %s")
+ IllegalArgumentException unsupportedExtension(String name);
+
+ @Message(id = 8597, value = "Invalid token %s value %s")
+ IllegalArgumentException invalidToken(String name, String value);
+
+ @Message(id = 8598, value = "Invalid token value %s")
+ IllegalArgumentException invalidTokenValue(String value);
+
}