Author: remy.maucherat(a)jboss.com
Date: 2013-10-17 08:26:41 -0400 (Thu, 17 Oct 2013)
New Revision: 2284
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
- Some more cleanups.
- Refactor again using a reusable semaphore.
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-16
16:57:58 UTC (rev 2283)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java 2013-10-17
12:26:41 UTC (rev 2284)
@@ -23,10 +23,9 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.CompletionHandler;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.coyote.InputBuffer;
@@ -81,9 +80,9 @@
private CompletionHandler<Integer, NioChannel> completionHandler;
/**
- * Lock used for auto blocking.
+ * Semaphore used for waiting for completion handler.
*/
- private CountDownLatch latch = new CountDownLatch(0);
+ private Semaphore semaphore = new Semaphore(1);
/**
* Create a new instance of {@code InternalNioInputBuffer}
@@ -121,7 +120,7 @@
bbuf.flip();
bbuf.get(buf, pos, nBytes);
lastValid = pos + nBytes;
- latch.countDown();
+ semaphore.release();
if (!processor.isProcessing() && processor.getReadNotifications()) {
if (!endpoint.processChannel(attachment, SocketStatus.OPEN_READ)) {
endpoint.closeChannel(attachment);
@@ -134,6 +133,7 @@
public void failed(Throwable exc, NioChannel attachment) {
processor.getResponse().setErrorException(exc);
endpoint.removeEventChannel(attachment);
+ semaphore.release();
if (!endpoint.processChannel(attachment, SocketStatus.ERROR)) {
endpoint.closeChannel(attachment);
}
@@ -196,8 +196,11 @@
*/
public boolean nextRequest() {
boolean result = super.nextRequest();
- nonBlocking = false;
available = false;
+ if (nonBlocking) {
+ semaphore.release();
+ }
+ nonBlocking = false;
return result;
}
@@ -418,18 +421,23 @@
int nRead = 0;
// Reading from client
if (nonBlocking) {
- synchronized (completionHandler) {
- if (latch.getCount() == 0) {
- // Prepare the internal input buffer for reading
- this.prepare();
- nonBlockingRead(readTimeout, unit);
- nRead = lastValid - pos;
+ if (semaphore.tryAcquire()) {
+ // Prepare the internal input buffer for reading
+ prepare();
+ try {
+ channel.read(bbuf, readTimeout, TimeUnit.MILLISECONDS, channel,
this.completionHandler);
+ } catch (Exception e) {
+ processor.getResponse().setErrorException(e);
+ if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
+ CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(e);
+ }
}
- }
- // If there's nothing and flow control is not used, autoblock
- if (nRead == 0 && !available) {
+ nRead = lastValid - pos;
+ } else if (nRead == 0 && !available) {
+ // If there's nothing and flow control is not used, autoblock
try {
- latch.await(readTimeout, unit);
+ if (semaphore.tryAcquire(readTimeout, unit))
+ semaphore.release();
} catch (InterruptedException e) {
// Ignore
}
@@ -437,7 +445,7 @@
}
} else {
// Prepare the internal input buffer for reading
- this.prepare();
+ prepare();
nRead = blockingRead(readTimeout, unit);
if (nRead > 0) {
bbuf.flip();
@@ -465,7 +473,6 @@
*/
private void prepare() {
bbuf.clear();
-
if (parsingHeader) {
if (lastValid == buf.length) {
throw MESSAGES.requestHeaderTooLarge();
@@ -484,25 +491,6 @@
}
/**
- * Read a sequence of bytes in non-blocking mode from he current channel
- *
- * @param bb
- * the byte buffer which will contain the bytes read from the
- * current channel
- */
- private void nonBlockingRead(long timeout, TimeUnit unit) {
- try {
- latch = new CountDownLatch(1);
- channel.read(bbuf, channel, this.completionHandler);
- } catch (Exception e) {
- processor.getResponse().setErrorException(e);
- if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
- CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(e);
- }
- }
- }
-
- /**
* Read a sequence of bytes in blocking mode from he current channel
*
* @param bb
Modified:
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
---
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-16
16:57:58 UTC (rev 2283)
+++
branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 2013-10-17
12:26:41 UTC (rev 2284)
@@ -23,7 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.coyote.ActionCode;
@@ -138,10 +138,10 @@
*/
private CompletionHandler<Integer, NioChannel> completionHandler;
- /**
- * Latch used for auto blocking.
- */
- private CountDownLatch latch = new CountDownLatch(0);
+ /**
+ * Semaphore used for waiting for completion handler.
+ */
+ private Semaphore semaphore = new Semaphore(1);
/**
* Create a new instance of {@code InternalNioOutputBuffer}
@@ -209,7 +209,7 @@
} else {
response.setLastWrite(nBytes);
leftover.recycle();
- latch.countDown();
+ semaphore.release();
if (!processor.isProcessing() &&
processor.getWriteNotification()) {
if (!endpoint.processChannel(attachment,
SocketStatus.OPEN_WRITE)) {
endpoint.closeChannel(attachment);
@@ -226,6 +226,7 @@
public void failed(Throwable exc, NioChannel attachment) {
processor.getResponse().setErrorException(exc);
endpoint.removeEventChannel(attachment);
+ semaphore.release();
if (!endpoint.processChannel(attachment, SocketStatus.ERROR)) {
endpoint.closeChannel(attachment);
}
@@ -288,35 +289,6 @@
return nw;
}
- /**
- * Perform a non-blocking write operation
- *
- * @param buffer
- * the buffer containing the data to write
- * @param timeout
- * a timeout for the operation
- * @param unit
- * The time unit
- */
- private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
- try {
- latch = new CountDownLatch(1);
- // Calculate the number of bytes that fit in the buffer
- int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
- // put bytes in the buffer
- bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
- // Update the offset
- leftover.setOffset(leftover.getOffset() + n);
- // Perform the write operation
- this.channel.write(this.bbuf, timeout, unit, this.channel,
this.completionHandler);
- } catch (Throwable t) {
- processor.getResponse().setErrorException(t);
- if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
- CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
- }
- }
- }
-
/**
* Perform a write operation. The operation may be blocking or non-blocking
* depending on the value of {@code nonBlocking} flag.
@@ -542,6 +514,9 @@
lastActiveFilter = -1;
committed = false;
finished = false;
+ if (nonBlocking) {
+ semaphore.release();
+ }
nonBlocking = false;
}
@@ -839,7 +814,8 @@
if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE &&
response.getFlushLeftovers()
&& Http11AbstractProcessor.containerThread.get() ==
Boolean.TRUE) {
try {
- latch.await(writeTimeout, TimeUnit.MILLISECONDS);
+ if (semaphore.tryAcquire(writeTimeout, TimeUnit.MILLISECONDS))
+ semaphore.release();
} catch (InterruptedException e) {
// Ignore
}
@@ -849,8 +825,19 @@
if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE) {
response.setLastWrite(0);
}
- if (latch.getCount() == 0) {
- nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+ if (semaphore.tryAcquire()) {
+ // Calculate the number of bytes that fit in the buffer
+ int n = Math.min(leftover.getLength(), bbuf.capacity() -
bbuf.position());
+ bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+ leftover.setOffset(leftover.getOffset() + n);
+ try {
+ channel.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS,
channel, completionHandler);
+ } catch (Exception e) {
+ processor.getResponse().setErrorException(e);
+ if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
+ CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(e);
+ }
+ }
}
}
} else {