Author: remy.maucherat(a)jboss.com
Date: 2014-06-06 11:52:54 -0400 (Fri, 06 Jun 2014)
New Revision: 2452
Modified:
branches/7.4.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioChannel.java
branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioEndpoint.java
Log:
Related to BZ1100491: Switch to the Tomcat style for IO writes with the NIO2 connector for
better reliability and results. Avoid some deadlocks caused by excessive locking.
Modified: branches/7.4.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java 2014-06-06
15:10:52 UTC (rev 2451)
+++
branches/7.4.x/src/main/java/org/apache/catalina/connector/OutputBuffer.java 2014-06-06
15:52:54 UTC (rev 2452)
@@ -700,7 +700,8 @@
throw MESSAGES.cannotSetListenerWithoutUpgradeOrAsync();
}
this.writeListener = writeListener;
- coyoteResponse.action(ActionCode.ACTION_EVENT_WRITE_BEGIN, null);
+ coyoteResponse.action(ActionCode.ACTION_EVENT_WRITE_BEGIN,
+ (response.getRequest().getUpgradeHandler() != null) ? writeListener :
null);
}
}
Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2014-06-06
15:10:52 UTC (rev 2451)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/Http11NioProcessor.java 2014-06-06
15:52:54 UTC (rev 2452)
@@ -38,6 +38,7 @@
import org.apache.coyote.http11.filters.SavedRequestInputFilter;
import org.apache.coyote.http11.filters.VoidInputFilter;
import org.apache.coyote.http11.filters.VoidOutputFilter;
+import org.apache.coyote.http11.upgrade.servlet31.WriteListener;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.HexUtils;
import org.apache.tomcat.util.buf.MessageBytes;
@@ -863,6 +864,7 @@
readNotifications = true;
} else if (actionCode == ActionCode.ACTION_EVENT_WRITE_BEGIN) {
outputBuffer.setNonBlocking(true);
+ outputBuffer.setWriteListener((WriteListener) param);
writeEvent(param);
} else if (actionCode == ActionCode.UPGRADE) {
// Switch to raw bytes mode
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2014-06-06
15:10:52 UTC (rev 2451)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2014-06-06
15:52:54 UTC (rev 2452)
@@ -29,6 +29,7 @@
import org.apache.coyote.ActionCode;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
+import org.apache.coyote.http11.upgrade.servlet31.WriteListener;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.buf.CharChunk;
import org.apache.tomcat.util.buf.MessageBytes;
@@ -143,6 +144,11 @@
*/
private Semaphore semaphore = new Semaphore(1);
+ /**
+ * Associated write listener for upgrade mode.
+ */
+ private WriteListener listener = null;
+
/**
* Create a new instance of {@code InternalNioOutputBuffer}
*
@@ -194,31 +200,60 @@
this.completionHandler = new CompletionHandler<Integer, NioChannel>() {
@Override
- public synchronized void completed(Integer nBytes, NioChannel attachment) {
+ public void completed(Integer nBytes, NioChannel attachment) {
if (nBytes < 0) {
failed(new IOException(MESSAGES.failedWrite()), attachment);
return;
}
- if (!bbuf.hasRemaining()) {
- bbuf.clear();
- if (leftover.getLength() > 0) {
- int n = Math.min(leftover.getLength(), bbuf.remaining());
- bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
- leftover.setOffset(leftover.getOffset() + n);
+ boolean notify = false;
+ boolean write = false;
+ synchronized (completionHandler) {
+ if (!bbuf.hasRemaining()) {
+ bbuf.clear();
+ if (leftover.getLength() > 0) {
+ int n = Math.min(leftover.getLength(), bbuf.remaining());
+ bbuf.put(leftover.getBuffer(), leftover.getOffset(),
n).flip();
+ leftover.setOffset(leftover.getOffset() + n);
+ write = true;
+ } else {
+ response.setLastWrite(nBytes);
+ leftover.recycle();
+ semaphore.release();
+ if (processor.getWriteNotification()) {
+ notify = true;
+ }
+ }
} else {
- response.setLastWrite(nBytes);
- leftover.recycle();
- semaphore.release();
- if (/*!processor.isProcessing() &&
*/processor.getWriteNotification()) {
- if (!endpoint.processChannel(attachment,
SocketStatus.OPEN_WRITE)) {
+ write = true;
+ }
+ }
+ if (write) {
+ attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS,
attachment, this);
+ }
+ if (notify) {
+ if (listener == null) {
+ if (!endpoint.processChannel(attachment,
SocketStatus.OPEN_WRITE)) {
+ endpoint.closeChannel(attachment);
+ }
+ } else {
+ Thread thread = Thread.currentThread();
+ ClassLoader originalClassLoader =
thread.getContextClassLoader();
+ try {
+
thread.setContextClassLoader(listener.getClass().getClassLoader());
+ synchronized (channel.getWriteLock()) {
+ listener.onWritePossible();
+ }
+ } catch (Exception e) {
+ processor.getResponse().setErrorException(e);
+ endpoint.removeEventChannel(attachment);
+ if (!endpoint.processChannel(attachment, SocketStatus.ERROR))
{
endpoint.closeChannel(attachment);
}
+ } finally {
+ thread.setContextClassLoader(originalClassLoader);
}
- return;
}
}
- // Write the remaining bytes
- attachment.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, attachment,
this);
}
@Override
@@ -386,6 +421,13 @@
}
/**
+ * Set the associated write listener for upgrade mode.
+ */
+ public void setWriteListener(WriteListener listener) {
+ this.listener = listener;
+ }
+
+ /**
* Add an output filter to the filter library.
*
* @param filter
@@ -479,6 +521,11 @@
lastActiveFilter = -1;
committed = false;
finished = false;
+ listener = null;
+ if (semaphore.availablePermits() != 1) {
+ semaphore.drainPermits();
+ semaphore.release();
+ }
writeTimeout = (endpoint.getSoTimeout() > 0 ? endpoint.getSoTimeout()
: Integer.MAX_VALUE);
}
@@ -508,9 +555,6 @@
lastActiveFilter = -1;
committed = false;
finished = false;
- if (nonBlocking) {
- semaphore.release();
- }
nonBlocking = false;
}
@@ -820,24 +864,26 @@
if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE) {
response.setLastWrite(0);
}
- if (semaphore.tryAcquire()) {
- // Calculate the number of bytes that fit in the buffer
- int n = Math.min(leftover.getLength(), bbuf.capacity() -
bbuf.position());
- bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
- leftover.setOffset(leftover.getOffset() + n);
- boolean writeNotification = processor.getWriteNotification();
- processor.setWriteNotification(false);
- try {
- channel.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS,
channel, completionHandler);
- } catch (Exception e) {
- processor.getResponse().setErrorException(e);
- if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
- CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(e);
- }
+ }
+ if (semaphore.tryAcquire()) {
+ // Calculate the number of bytes that fit in the buffer
+ int n = Math.min(leftover.getLength(), bbuf.capacity() -
bbuf.position());
+ bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+ leftover.setOffset(leftover.getOffset() + n);
+ boolean writeNotification = processor.getWriteNotification();
+ processor.setWriteNotification(false);
+ try {
+ channel.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, channel,
completionHandler);
+ } catch (Exception e) {
+ processor.getResponse().setErrorException(e);
+ if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
+ CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(e);
}
- if (writeNotification && bbuf.hasRemaining()) {
- // Write did not complete inline, possible write
notification
- processor.setWriteNotification(writeNotification);
+ }
+ if (semaphore.availablePermits() == 0) {
+ // Write did not complete inline, possible write notification
+ if (writeNotification) {
+ processor.setWriteNotification(true);
}
}
}
Modified: branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioChannel.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioChannel.java 2014-06-06
15:10:52 UTC (rev 2451)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioChannel.java 2014-06-06
15:52:54 UTC (rev 2452)
@@ -189,6 +189,15 @@
private long id;
private ByteBuffer buffer;
+ private Object lock = new Object();
+ public Object getLock() {
+ return lock;
+ }
+ private Object writeLock = new Object();
+ public Object getWriteLock() {
+ return writeLock;
+ }
+
/**
* Create a new instance of {@code NioChannel}
*
Modified: branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioEndpoint.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioEndpoint.java 2014-06-06
15:10:52 UTC (rev 2451)
+++ branches/7.4.x/src/main/java/org/apache/tomcat/util/net/NioEndpoint.java 2014-06-06
15:52:54 UTC (rev 2452)
@@ -931,7 +931,7 @@
if (status == null) {
state = handler.process(channel);
} else {
- synchronized (channel) {
+ synchronized (channel.getLock()) {
state = handler.event(channel, status);
}
}