Author: remy.maucherat(a)jboss.com
Date: 2014-11-20 11:14:55 -0500 (Thu, 20 Nov 2014)
New Revision: 2550
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
branches/7.5.x/src/main/java/org/jboss/web/WebsocketsLogger.java
Log:
Port upstream websocket fixes.
Modified:
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java
===================================================================
---
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2014-11-18
23:08:31 UTC (rev 2549)
+++
branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsRemoteEndpointImplBase.java 2014-11-20
16:14:55 UTC (rev 2550)
@@ -304,12 +304,10 @@
boolean doWrite = false;
synchronized (messagePartLock) {
- if (Constants.OPCODE_CLOSE == mp.getOpCode()) {
- try {
- setBatchingAllowed(false);
- } catch (IOException e) {
- WebsocketsLogger.ROOT_LOGGER.flushOnCloseFailed(e);
- }
+ if (Constants.OPCODE_CLOSE == mp.getOpCode() && getBatchingAllowed())
{
+ // Should not happen. To late to send batched messages now since
+ // the session has been closed. Complain loudly.
+ WebsocketsLogger.ROOT_LOGGER.cannotFlushOnClose();
}
if (messagePartInProgress) {
// When a control message is sent while another message is being
@@ -382,7 +380,10 @@
if (Constants.INTERNAL_OPCODE_FLUSH == mp.getOpCode()) {
nextFragmented = fragmented;
nextText = text;
- doWrite(mp.getEndHandler(), outputBuffer);
+ outputBuffer.flip();
+ SendHandler flushHandler = new OutputBufferFlushSendHandler(
+ outputBuffer, mp.getEndHandler());
+ doWrite(flushHandler, outputBuffer);
return;
}
@@ -861,6 +862,30 @@
}
}
+
+ /**
+ * Ensures that the output buffer is cleared after it has been flushed.
+ */
+ private static class OutputBufferFlushSendHandler implements SendHandler {
+
+ private final ByteBuffer outputBuffer;
+ private final SendHandler handler;
+
+ public OutputBufferFlushSendHandler(ByteBuffer outputBuffer, SendHandler handler)
{
+ this.outputBuffer = outputBuffer;
+ this.handler = handler;
+ }
+
+ @Override
+ public void onResult(SendResult result) {
+ if (result.isOK()) {
+ outputBuffer.clear();
+ }
+ handler.onResult(result);
+ }
+ }
+
+
private class WsOutputStream extends OutputStream {
private final WsRemoteEndpointImplBase endpoint;
Modified: branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java
===================================================================
--- branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2014-11-18
23:08:31 UTC (rev 2549)
+++ branches/7.5.x/src/main/java/org/apache/tomcat/websocket/WsSession.java 2014-11-20
16:14:55 UTC (rev 2550)
@@ -425,6 +425,13 @@
return;
}
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ WebsocketsLogger.ROOT_LOGGER.flushOnCloseFailed(e);
+ fireEndpointOnError(e);
+ }
+
state = State.CLOSING;
if (!Constants.RELAXED_CLOSE_EVENT) {
@@ -455,6 +462,12 @@
synchronized (stateLock) {
if (state == State.OPEN) {
+ try {
+ wsRemoteEndpoint.setBatchingAllowed(false);
+ } catch (IOException e) {
+ WebsocketsLogger.ROOT_LOGGER.flushOnCloseFailed(e);
+ fireEndpointOnError(e);
+ }
if (!Constants.RELAXED_CLOSE_EVENT) {
sendCloseMessage(closeReason);
fireEndpointOnClose(closeReason);
@@ -488,6 +501,20 @@
}
+ private void fireEndpointOnError(Throwable throwable) {
+
+ // Fire the onError event
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+ try {
+ localEndpoint.onError(this, throwable);
+ } finally {
+ t.setContextClassLoader(cl);
+ }
+ }
+
+
private void sendCloseMessage(CloseReason closeReason) {
// 125 is maximum size for the payload of a control message
ByteBuffer msg = ByteBuffer.allocate(125);
Modified: branches/7.5.x/src/main/java/org/jboss/web/WebsocketsLogger.java
===================================================================
--- branches/7.5.x/src/main/java/org/jboss/web/WebsocketsLogger.java 2014-11-18 23:08:31
UTC (rev 2549)
+++ branches/7.5.x/src/main/java/org/jboss/web/WebsocketsLogger.java 2014-11-20 16:14:55
UTC (rev 2550)
@@ -102,4 +102,8 @@
@Message(id = 8814, value = "Thread group %s not destroyed, %s threads
left")
void threadGroupNotDestryed(String name, int threadCount);
+ @LogMessage(level = WARN)
+ @Message(id = 8815, value = "Cannot flush batched messages after closing the
session")
+ void cannotFlushOnClose();
+
}
Show replies by date