[jbossweb-commits] JBossWeb SVN: r2284 - branches/7.4.x/src/main/java/org/apache/coyote/http11.

jbossweb-commits at lists.jboss.org jbossweb-commits at lists.jboss.org
Thu Oct 17 08:26:41 EDT 2013


Author: remy.maucherat at jboss.com
Date: 2013-10-17 08:26:41 -0400 (Thu, 17 Oct 2013)
New Revision: 2284

Modified:
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
   branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
Log:
- Some more cleanups.
- Refactor again using a reusable semaphore.

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-16 16:57:58 UTC (rev 2283)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioInputBuffer.java	2013-10-17 12:26:41 UTC (rev 2284)
@@ -23,10 +23,9 @@
 import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.CompletionHandler;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.coyote.InputBuffer;
@@ -81,9 +80,9 @@
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
     /**
-     * Lock used for auto blocking.
+     * Semaphore used for waiting for completion handler.
      */
-    private CountDownLatch latch = new CountDownLatch(0);
+    private Semaphore semaphore = new Semaphore(1);
     
     /**
 	 * Create a new instance of {@code InternalNioInputBuffer}
@@ -121,7 +120,7 @@
 			        bbuf.flip();
 			        bbuf.get(buf, pos, nBytes);
 			        lastValid = pos + nBytes;
-			        latch.countDown();
+			        semaphore.release();
 			        if (!processor.isProcessing() && processor.getReadNotifications()) {
 			            if (!endpoint.processChannel(attachment, SocketStatus.OPEN_READ)) {
 			                endpoint.closeChannel(attachment);
@@ -134,6 +133,7 @@
 			public void failed(Throwable exc, NioChannel attachment) {
 			    processor.getResponse().setErrorException(exc);
 			    endpoint.removeEventChannel(attachment);
+                semaphore.release();
 			    if (!endpoint.processChannel(attachment, SocketStatus.ERROR)) {
 			        endpoint.closeChannel(attachment);
 			    }
@@ -196,8 +196,11 @@
 	 */
 	public boolean nextRequest() {
 		boolean result = super.nextRequest();
-		nonBlocking = false;
 		available = false;
+        if (nonBlocking) {
+            semaphore.release();
+        }
+        nonBlocking = false;
 
 		return result;
 	}
@@ -418,18 +421,23 @@
         int nRead = 0;
         // Reading from client
         if (nonBlocking) {
-            synchronized (completionHandler) {
-                if (latch.getCount() == 0) {
-                    // Prepare the internal input buffer for reading
-                    this.prepare();
-                    nonBlockingRead(readTimeout, unit);
-                    nRead = lastValid - pos;
+            if (semaphore.tryAcquire()) {
+                // Prepare the internal input buffer for reading
+                prepare();
+                try {
+                    channel.read(bbuf, readTimeout, TimeUnit.MILLISECONDS, channel, this.completionHandler);
+                } catch (Exception e) {
+                    processor.getResponse().setErrorException(e);
+                    if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
+                        CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(e);
+                    }
                 }
-            }
-            // If there's nothing and flow control is not used, autoblock 
-            if (nRead == 0 && !available) {
+                nRead = lastValid - pos;
+            } else if (nRead == 0 && !available) {
+                // If there's nothing and flow control is not used, autoblock 
                 try {
-                    latch.await(readTimeout, unit);
+                    if (semaphore.tryAcquire(readTimeout, unit))
+                        semaphore.release();
                 } catch (InterruptedException e) {
                     // Ignore
                 }
@@ -437,7 +445,7 @@
             }
         } else {
             // Prepare the internal input buffer for reading
-            this.prepare();
+            prepare();
             nRead = blockingRead(readTimeout, unit);
             if (nRead > 0) {
                 bbuf.flip();
@@ -465,7 +473,6 @@
 	 */
 	private void prepare() {
 		bbuf.clear();
-
 		if (parsingHeader) {
 			if (lastValid == buf.length) {
 				throw MESSAGES.requestHeaderTooLarge();
@@ -484,25 +491,6 @@
 	}
 
 	/**
-	 * Read a sequence of bytes in non-blocking mode from he current channel
-	 * 
-	 * @param bb
-	 *            the byte buffer which will contain the bytes read from the
-	 *            current channel
-	 */
-	private void nonBlockingRead(long timeout, TimeUnit unit) {
-		try {
-		    latch = new CountDownLatch(1);
-		    channel.read(bbuf, channel, this.completionHandler);
-		} catch (Exception e) {
-		    processor.getResponse().setErrorException(e);
-			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
-			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingRead(e);
-			}
-		}
-	}
-
-	/**
 	 * Read a sequence of bytes in blocking mode from he current channel
 	 * 
 	 * @param bb

Modified: branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
===================================================================
--- branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-16 16:57:58 UTC (rev 2283)
+++ branches/7.4.x/src/main/java/org/apache/coyote/http11/InternalNioOutputBuffer.java	2013-10-17 12:26:41 UTC (rev 2284)
@@ -23,7 +23,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.coyote.ActionCode;
@@ -138,10 +138,10 @@
 	 */
 	private CompletionHandler<Integer, NioChannel> completionHandler;
 
-	/**
-	 * Latch used for auto blocking.
-	 */
-	private CountDownLatch latch = new CountDownLatch(0);
+    /**
+     * Semaphore used for waiting for completion handler.
+     */
+    private Semaphore semaphore = new Semaphore(1);
 	
 	/**
 	 * Create a new instance of {@code InternalNioOutputBuffer}
@@ -209,7 +209,7 @@
                     } else {
                         response.setLastWrite(nBytes);
                         leftover.recycle();
-                        latch.countDown();
+                        semaphore.release();
                         if (!processor.isProcessing() && processor.getWriteNotification()) {
                             if (!endpoint.processChannel(attachment, SocketStatus.OPEN_WRITE)) {
                                 endpoint.closeChannel(attachment);
@@ -226,6 +226,7 @@
 			public void failed(Throwable exc, NioChannel attachment) {
                 processor.getResponse().setErrorException(exc);
 				endpoint.removeEventChannel(attachment);
+                semaphore.release();
 				if (!endpoint.processChannel(attachment, SocketStatus.ERROR)) {
 				    endpoint.closeChannel(attachment);
 				}
@@ -288,35 +289,6 @@
 		return nw;
 	}
 
-	/**
-	 * Perform a non-blocking write operation
-	 * 
-	 * @param buffer
-	 *            the buffer containing the data to write
-	 * @param timeout
-	 *            a timeout for the operation
-	 * @param unit
-	 *            The time unit
-	 */
-	private void nonBlockingWrite(final long timeout, final TimeUnit unit) {
-		try {
-            latch = new CountDownLatch(1);
-		    // Calculate the number of bytes that fit in the buffer
-		    int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
-		    // put bytes in the buffer
-		    bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
-		    // Update the offset
-		    leftover.setOffset(leftover.getOffset() + n);
-		    // Perform the write operation
-		    this.channel.write(this.bbuf, timeout, unit, this.channel, this.completionHandler);
-		} catch (Throwable t) {
-            processor.getResponse().setErrorException(t);
-			if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
-			    CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(t);
-			}
-		}
-	}
-
     /**
      * Perform a write operation. The operation may be blocking or non-blocking
      * depending on the value of {@code nonBlocking} flag.
@@ -542,6 +514,9 @@
         lastActiveFilter = -1;
         committed = false;
         finished = false;
+        if (nonBlocking) {
+            semaphore.release();
+        }
         nonBlocking = false;
     }
 
@@ -839,7 +814,8 @@
                 if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE && response.getFlushLeftovers()
                         && Http11AbstractProcessor.containerThread.get() == Boolean.TRUE) {
                     try {
-                        latch.await(writeTimeout, TimeUnit.MILLISECONDS);
+                        if (semaphore.tryAcquire(writeTimeout, TimeUnit.MILLISECONDS))
+                            semaphore.release();
                     } catch (InterruptedException e) {
                         // Ignore
                     }
@@ -849,8 +825,19 @@
                     if (leftover.getLength() > Constants.ASYNC_BUFFER_SIZE) {
                         response.setLastWrite(0);
                     }
-                    if (latch.getCount() == 0) {
-                        nonBlockingWrite(writeTimeout, TimeUnit.MILLISECONDS);
+                    if (semaphore.tryAcquire()) {
+                        // Calculate the number of bytes that fit in the buffer
+                        int n = Math.min(leftover.getLength(), bbuf.capacity() - bbuf.position());
+                        bbuf.put(leftover.getBuffer(), leftover.getOffset(), n).flip();
+                        leftover.setOffset(leftover.getOffset() + n);
+                        try {
+                            channel.write(bbuf, writeTimeout, TimeUnit.MILLISECONDS, channel, completionHandler);
+                        } catch (Exception e) {
+                            processor.getResponse().setErrorException(e);
+                            if (CoyoteLogger.HTTP_LOGGER.isDebugEnabled()) {
+                                CoyoteLogger.HTTP_LOGGER.errorWithNonBlockingWrite(e);
+                            }
+                        }
                     }
                 }
             } else {



More information about the jbossweb-commits mailing list