Author: remy.maucherat(a)jboss.com
Date: 2013-11-21 11:09:03 -0500 (Thu, 21 Nov 2013)
New Revision: 2309
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
Log:
Port big patch redoing concurrency control of sending text messages in websockets.
Modified:
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2013-11-18
15:39:46 UTC (rev 2308)
+++
branches/7.4.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2013-11-21
16:09:03 UTC (rev 2309)
@@ -54,13 +54,14 @@
public static final String BLOCKING_SEND_TIMEOUT_PROPERTY =
"org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";
+ private final StateMachine stateMachine = new StateMachine();
+
private boolean messagePartInProgress = false;
private final Queue<MessagePart> messagePartQueue = new
ArrayDeque<MessagePart>();
private final Object messagePartLock = new Object();
- private boolean dataMessageInProgress = false;
// State
- private boolean closed = false;
+ private volatile boolean closed = false;
private boolean fragmented = false;
private boolean nextFragmented = false;
private boolean text = false;
@@ -109,7 +110,9 @@
public void sendBytes(ByteBuffer data) throws IOException {
+ stateMachine.binaryStart();
startMessageBlock(Constants.OPCODE_BINARY, data, true);
+ stateMachine.complete(true);
}
@@ -121,13 +124,17 @@
public void sendBytesByCompletion(ByteBuffer data, SendHandler handler) {
- startMessage(Constants.OPCODE_BINARY, data, true, handler);
+ StateUpdateSendHandler sush = new StateUpdateSendHandler(handler);
+ stateMachine.binaryStart();
+ startMessage(Constants.OPCODE_BINARY, data, true, sush);
}
public void sendPartialBytes(ByteBuffer partialByte, boolean last)
throws IOException {
+ stateMachine.binaryPartialStart();
startMessageBlock(Constants.OPCODE_BINARY, partialByte, last);
+ stateMachine.complete(last);
}
@@ -146,6 +153,7 @@
public void sendString(String text) throws IOException {
+ stateMachine.textStart();
sendPartialString(CharBuffer.wrap(text), true);
}
@@ -158,24 +166,29 @@
public void sendStringByCompletion(String text, SendHandler handler) {
+ stateMachine.textStart();
TextMessageSendHandler tmsh = new TextMessageSendHandler(handler,
CharBuffer.wrap(text), true, encoder, encoderBuffer, this);
tmsh.write();
+ // TextMessageSendHandler will update stateMachine when it completes
}
public void sendPartialString(String fragment, boolean isLast)
throws IOException {
+ stateMachine.textPartialStart();
sendPartialString(CharBuffer.wrap(fragment), isLast);
}
public OutputStream getSendStream() {
+ stateMachine.streamStart();
return new WsOutputStream(this);
}
public Writer getSendWriter() {
+ stateMachine.writeStart();
return new WsWriter(this);
}
@@ -245,13 +258,15 @@
}
}
if (messagePartInProgress) {
- if (!Util.isControl(opCode)) {
- if (dataMessageInProgress) {
- throw MESSAGES.messageInProgress();
- } else {
- dataMessageInProgress = true;
- }
- }
+ // When a control message is sent while another message is being
+ // the control message is queued. Chances are the subsequent
+ // data message part will end up queued while the control
+ // message is sent. The logic in this class (state machine,
+ // EndMessageHanlder, TextMessageSendHandler) ensures that there
+ // will only ever be one data message part in the queue. There
+ // could be multiple control messages in the queue.
+
+ // Add it to the queue
messagePartQueue.add(mp);
} else {
messagePartInProgress = true;
@@ -261,16 +276,12 @@
}
- void endMessage(SendHandler handler, SendResult result,
- boolean dataMessage) {
+ void endMessage(SendHandler handler, SendResult result) {
synchronized (messagePartLock) {
fragmented = nextFragmented;
text = nextText;
- if (dataMessage) {
- dataMessageInProgress = false;
- }
MessagePart mpNext = messagePartQueue.poll();
if (mpNext == null) {
messagePartInProgress = false;
@@ -388,8 +399,7 @@
this.opCode = opCode;
this.payload = payload;
this.last = last;
- this.handler = new EndMessageHandler(
- endpoint, handler, !Util.isControl(opCode));
+ this.handler = new EndMessageHandler(endpoint, handler);
}
@@ -422,19 +432,17 @@
private final WsRemoteEndpointImplBase endpoint;
private final SendHandler handler;
- private final boolean dataMessage;
public EndMessageHandler(WsRemoteEndpointImplBase endpoint,
- SendHandler handler, boolean dataMessage) {
+ SendHandler handler) {
this.endpoint = endpoint;
this.handler = handler;
- this.dataMessage = dataMessage;
}
@Override
public void onResult(SendResult result) {
- endpoint.endMessage(handler, result, dataMessage);
+ endpoint.endMessage(handler, result);
}
}
@@ -622,7 +630,7 @@
}
- private static class TextMessageSendHandler implements SendHandler {
+ private class TextMessageSendHandler implements SendHandler {
private final SendHandler handler;
private final CharBuffer message;
@@ -657,8 +665,14 @@
@Override
public void onResult(SendResult result) {
- if (isDone || !result.isOK()) {
+ if (isDone) {
+ endpoint.stateMachine.complete(isLast);
handler.onResult(result);
+ } else if(!result.isOK()) {
+ handler.onResult(result);
+ } else if (closed){
+ SendResult sr = new SendResult(new
IOException(MESSAGES.messageRemainderSessionClosed()));
+ handler.onResult(sr);
} else {
write();
}
@@ -934,4 +948,106 @@
return encoder;
}
}
+
+
+ private static enum State {
+ OPEN,
+ STREAM_WRITING,
+ WRITER_WRITING,
+ BINARY_PARTIAL_WRITING,
+ BINARY_PARTIAL_READY,
+ BINARY_FULL_WRITING,
+ TEXT_PARTIAL_WRITING,
+ TEXT_PARTIAL_READY,
+ TEXT_FULL_WRITING
+ }
+
+
+ private static class StateMachine {
+ private State state = State.OPEN;
+
+ public synchronized void streamStart() {
+ checkState(State.OPEN);
+ state = State.STREAM_WRITING;
+ }
+
+ public synchronized void writeStart() {
+ checkState(State.OPEN);
+ state = State.WRITER_WRITING;
+ }
+
+ public synchronized void binaryPartialStart() {
+ checkState(State.OPEN, State.BINARY_PARTIAL_READY);
+ state = State.BINARY_PARTIAL_WRITING;
+ }
+
+ public synchronized void binaryStart() {
+ checkState(State.OPEN);
+ state = State.BINARY_FULL_WRITING;
+ }
+
+ public synchronized void textPartialStart() {
+ checkState(State.OPEN, State.TEXT_PARTIAL_READY);
+ state = State.TEXT_PARTIAL_WRITING;
+ }
+
+ public synchronized void textStart() {
+ checkState(State.OPEN);
+ state = State.TEXT_FULL_WRITING;
+ }
+
+ public synchronized void complete(boolean last) {
+ if (last) {
+ checkState(State.TEXT_PARTIAL_WRITING, State.TEXT_FULL_WRITING,
+ State.BINARY_PARTIAL_WRITING, State.BINARY_FULL_WRITING,
+ State.STREAM_WRITING, State.WRITER_WRITING);
+ state = State.OPEN;
+ } else {
+ checkState(State.TEXT_PARTIAL_WRITING, State.BINARY_PARTIAL_WRITING,
+ State.STREAM_WRITING, State.WRITER_WRITING);
+ if (state == State.TEXT_PARTIAL_WRITING) {
+ state = State.TEXT_PARTIAL_READY;
+ } else if (state == State.BINARY_PARTIAL_WRITING){
+ state = State.BINARY_PARTIAL_READY;
+ } else if (state == State.WRITER_WRITING) {
+ // NO-OP. Leave state as is.
+ } else if (state == State.STREAM_WRITING) {
+ // NO-OP. Leave state as is.
+ } else {
+ // Should never happen
+ // The if ... else ... blocks above should cover all states
+ // permitted by the preceding checkState() call
+ throw new IllegalStateException(
+ "BUG: This code should never be called");
+ }
+ }
+ }
+
+ private void checkState(State... required) {
+ for (State state : required) {
+ if (this.state == state) {
+ return;
+ }
+ }
+ throw MESSAGES.wrongState(state.toString());
+ }
+ }
+
+
+ private class StateUpdateSendHandler implements SendHandler {
+
+ private final SendHandler handler;
+
+ public StateUpdateSendHandler(SendHandler handler) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void onResult(SendResult result) {
+ if (result.isOK()) {
+ stateMachine.complete(true);
+ }
+ handler.onResult(result);
+ }
+ }
}
Modified: branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java
===================================================================
--- branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-11-18 15:39:46
UTC (rev 2308)
+++ branches/7.4.x/src/main/java/org/jboss/web/WebsocketsMessages.java 2013-11-21 16:09:03
UTC (rev 2309)
@@ -122,8 +122,8 @@
@Message(id = 8528, value = "Too many bytes ([%s]) were provided to be converted
into a long")
String invalidLong(long length);
- @Message(id = 8529, value = "Message will not be sent because the WebSocket
session is currently sending another message")
- IllegalStateException messageInProgress();
+ @Message(id = 8529, value = "The remote endpoint was in state [%s] which is an
invalid state for called method")
+ IllegalStateException wrongState(String state);
@Message(id = 8530, value = "Message will not be sent because the WebSocket
session has been closed")
IllegalStateException messageSessionClosed();
@@ -299,4 +299,7 @@
@Message(id = 8587, value = "This connection was established under an
authenticated HTTP session that has ended")
String expiredHttpSession();
+ @Message(id = 8588, value = "The remainder of the message will not be sent
because the WebSocket session has been closed")
+ String messageRemainderSessionClosed();
+
}