[jbossweb-commits] JBossWeb SVN: r213 - in trunk/java/org/apache: coyote/http11 and 2 other directories.

jbossweb-commits at lists.jboss.org jbossweb-commits at lists.jboss.org
Sat Aug 4 13:00:23 EDT 2007


Author: remy.maucherat at jboss.com
Date: 2007-08-04 13:00:23 -0400 (Sat, 04 Aug 2007)
New Revision: 213

Modified:
   trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
   trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
   trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
   trunk/java/org/apache/tomcat/jni/Socket.java
   trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Finish coding the new Comet API.
- Some FIXMEs remain for cases I'm not sure about (like doing sync non blocking writes), 
  and testing will be very complex.
- Modify Comet begin and end timing (it was also possible that end was never called).

Modified: trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
===================================================================
--- trunk/java/org/apache/catalina/connector/CoyoteAdapter.java	2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/catalina/connector/CoyoteAdapter.java	2007-08-04 17:00:23 UTC (rev 213)
@@ -202,9 +202,10 @@
                     error = true;
                     connector.getContainer().getPipeline().getFirst().event(request, response, request.getEvent());
                 }
-                if (response.isClosed() || !request.isComet()) {
+                /*if (response.isClosed() || !request.isComet()) {
                     res.action(ActionCode.ACTION_COMET_END, null);
-                } else if (!error && read && request.isReadable()) {
+                } else*/
+                if (!error && read && request.isReadable()) {
                     // If this was a read and not all bytes have been read, or if no data
                     // was read from the connector, then it is an error
                     log.error(sm.getString("coyoteAdapter.read"));
@@ -224,6 +225,7 @@
                 req.getRequestProcessor().setWorkerThreadName(null);
                 // Recycle the wrapper request and response
                 if (error || response.isClosed() || !request.isComet()) {
+                    res.action(ActionCode.ACTION_COMET_END, null);
                     request.recycle();
                     request.setFilterChain(null);
                     response.recycle();
@@ -285,15 +287,14 @@
 
                 if (request.isComet()) {
                     if (!response.isClosed() && !response.isError()) {
+                        res.action(ActionCode.ACTION_COMET_BEGIN, null);
                         if (request.isReadable()) {
                             // Invoke a read event right away if there are available bytes
                             if (event(req, res, SocketStatus.OPEN_READ)) {
                                 comet = true;
-                                res.action(ActionCode.ACTION_COMET_BEGIN, null);
                             }
                         } else {
                             comet = true;
-                            res.action(ActionCode.ACTION_COMET_BEGIN, null);
                         }
                     } else {
                         // Clear the filter chain, as otherwise it will not be reset elsewhere

Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java	2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java	2007-08-04 17:00:23 UTC (rev 213)
@@ -111,6 +111,12 @@
 
     // ----------------------------------------------------- Instance Variables
 
+    
+    /**
+     * Thread local marker.
+     */
+    public static ThreadLocal<Boolean> containerThread = new ThreadLocal<Boolean>();
+    
 
     /**
      * Associated adapter.
@@ -693,7 +699,6 @@
      */
     public void setSocketBuffer(int socketBuffer) {
         this.socketBuffer = socketBuffer;
-        outputBuffer.setSocketBuffer(socketBuffer);
     }
 
     /**
@@ -754,8 +759,14 @@
         throws IOException {
         
         RequestInfo rp = request.getRequestProcessor();
-        
         try {
+            // If processing a write event, must flush any leftover bytes first
+            if (status == SocketStatus.OPEN_WRITE) {
+                // FIXME: If the flush does not manage to flush all leftover bytes, it is possible
+                // that the servlet is not going to be able to write bytes. This will be handled properly,
+                // but is wasteful.
+                outputBuffer.flushLeftover();
+            }
             rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
             error = !adapter.event(request, response, status);
         } catch (InterruptedIOException e) {
@@ -1217,14 +1228,24 @@
             request.setAvailable(inputBuffer.available());
         } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
             comet = true;
+            // Set socket to non blocking mode
+            Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+            containerThread.set(Boolean.TRUE);
+            outputBuffer.setNonBlocking(true);
         } else if (actionCode == ActionCode.ACTION_COMET_END) {
             comet = false;
+            // End non blocking mode
+            Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+            containerThread.set(null);
+            outputBuffer.setNonBlocking(false);
         } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
             readNotifications = false;
         } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
             readNotifications = true;
             endpoint.getCometPoller().add(socket, timeout, false, false, true);
         } else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
+            // FIXME: Maybe, should check (?)
+            // FIXME: If called synchronously, setting a flag instead of a direct call is needed.
             endpoint.getCometPoller().add(socket, timeout, false, true, false);
         } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
             cometTimeout = ((Integer) param).intValue();

Modified: trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
===================================================================
--- trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java	2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java	2007-08-04 17:00:23 UTC (rev 213)
@@ -79,6 +79,9 @@
         committed = false;
         finished = false;
 
+        leftover = new ByteChunk();
+        nonBlocking = false;
+        
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
 
@@ -176,6 +179,12 @@
      */
     protected ByteChunk leftover = null;
 
+    
+    /**
+     * Non blocking mode.
+     */
+    protected boolean nonBlocking = false;
+    
 
     // ------------------------------------------------------------- Properties
 
@@ -198,14 +207,22 @@
 
 
     /**
-     * Set the socket buffer size.
+     * Set the non blocking flag.
      */
-    public void setSocketBuffer(int socketBufferSize) {
-        // FIXME: Remove
+    public void setNonBlocking(boolean nonBlocking) {
+        this.nonBlocking = nonBlocking;
     }
 
 
     /**
+     * Get the non blocking flag value.
+     */
+    public boolean getNonBlocking() {
+        return nonBlocking;
+    }
+
+
+    /**
      * Add an output filter to the filter library.
      */
     public void addFilter(OutputFilter filter) {
@@ -344,10 +361,12 @@
         }
 
         // Reset pointers
+        leftover.recycle();
         pos = 0;
         lastActiveFilter = -1;
         committed = false;
         finished = false;
+        nonBlocking = false;
 
     }
 
@@ -687,6 +706,47 @@
 
     }
 
+    
+    /**
+     * Flush leftover bytes.
+     * 
+     * @return true if all leftover bytes have been flushed
+     */
+    public boolean flushLeftover()
+        throws IOException {
+        if (leftover.getLength() > 0) {
+            int len = leftover.getLength();
+            int start = leftover.getStart();
+            byte[] b = leftover.getBuffer();
+            while (len > 0) {
+                int thisTime = len;
+                if (bbuf.position() == bbuf.capacity()) {
+                    int pos = 0;
+                    int end = bbuf.position();
+                    int res = Socket.sendibb(socket, 0, bbuf.position());
+                    while (res > 0 && pos < end) {
+                        pos += res;
+                        res = Socket.sendibb(socket, pos, bbuf.position());
+                    }
+                    if (res < 0) {
+                        throw new IOException("Error");
+                    }
+                    if (pos < end) {
+                        // Could not write all leftover data: put back to write poller
+                        return false;
+                    }
+                }
+                if (thisTime > bbuf.capacity() - bbuf.position()) {
+                    thisTime = bbuf.capacity() - bbuf.position();
+                }
+                bbuf.put(b, start, thisTime);
+                len = len - thisTime;
+                start = start + thisTime;
+            }
+        }
+        return true;
+    }
+    
 
     /**
      * Callback to write data from the buffer.
@@ -694,15 +754,49 @@
     protected void flushBuffer()
         throws IOException {
         if (bbuf.position() > 0) {
-            int res = Socket.sendbb(socket, 0, bbuf.position());
+            int res = 0;
+            if (nonBlocking) {
+                // If there are still leftover bytes here, there's a problem:
+                // - If the call is asynchronous, throw an exception
+                // - If the call is synchronous, make a regular blocking write to flush the data
+                if (leftover.getLength() > 0) {
+                    if (Http11AprProcessor.containerThread.get() == Boolean.TRUE) {
+                        Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+                        // Send leftover bytes
+                        res = Socket.send(socket, leftover.getBuffer(), leftover.getOffset(), leftover.getEnd());
+                        // Send current buffer
+                        if (res > 0) {
+                            res = Socket.sendbb(socket, 0, bbuf.position());
+                        }
+                        Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+                    } else {
+                        throw new IOException("Backlog");
+                    }
+                } else {
+                    // Perform non blocking writes until all data is written, or the result
+                    // of the write is 0
+                    int pos = 0;
+                    int end = bbuf.position();
+                    res = Socket.sendibb(socket, 0, bbuf.position());
+                    while (res > 0 && pos < end) {
+                        pos += res;
+                        res = Socket.sendibb(socket, pos, bbuf.position());
+                    }
+                    // Put any leftover bytes in the leftover byte chunk
+                    if (pos < end) {
+                        leftover.allocate(end - pos, -1);
+                        bbuf.get(leftover.getBuffer(), 0, end - pos);
+                        leftover.setEnd(end-pos);
+                    }
+                }
+            } else {
+                res = Socket.sendbb(socket, 0, bbuf.position());
+            }
             response.setLastWrite(res);
-            // FIXME: If in Comet mode and non blocking, and if it did not write the whole thing,
-            // then retry until it writes 0 bytes (?), and put the leftover bytes in the leftover
-            // byte chunk
+            bbuf.clear();
             if (res < 0) {
-                throw new IOException();
+                throw new IOException("Error");
             }
-            bbuf.clear();
         }
     }
 
@@ -731,6 +825,9 @@
                 int thisTime = len;
                 if (bbuf.position() == bbuf.capacity()) {
                     flushBuffer();
+                    // FIXME: If non blocking (comet) and there are leftover bytes, 
+                    // put all remaining bytes in the leftover buffer
+                    
                 }
                 if (thisTime > bbuf.capacity() - bbuf.position()) {
                     thisTime = bbuf.capacity() - bbuf.position();

Modified: trunk/java/org/apache/tomcat/jni/Socket.java
===================================================================
--- trunk/java/org/apache/tomcat/jni/Socket.java	2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/tomcat/jni/Socket.java	2007-08-04 17:00:23 UTC (rev 213)
@@ -241,13 +241,44 @@
      */
     public static native int sendb(long sock, ByteBuffer buf,
                                    int offset, int len);
+
     /**
+     * Send data over a network without retry
+     * <PRE>
+     * This functions acts like a blocking write by default.  To change
+     * this behavior, use apr_socket_timeout_set() or the APR_SO_NONBLOCK
+     * socket option.
+     *
+     * It is possible for both bytes to be sent and an error to be returned.
+     *
+     * </PRE>
+     * @param sock The socket to send the data over.
+     * @param buf The Byte buffer which contains the data to be sent.
+     * @param offset The offset within the buffer array of the first buffer from
+     *               which bytes are to be retrieved; must be non-negative
+     *               and no larger than buf.length
+     * @param len The maximum number of buffers to be accessed; must be non-negative
+     *            and no larger than buf.length - offset
+     * @return The number of bytes send.
+     *
+     */
+    public static native int sendib(long sock, ByteBuffer buf,
+            int offset, int len);
+
+    /**
      * Send data over a network using internally set ByteBuffer
      */
     public static native int sendbb(long sock,
-                                   int offset, int len);
+                                    int offset, int len);
 
     /**
+     * Send data over a network using internally set ByteBuffer
+     * without internal retry.
+     */
+    public static native int sendibb(long sock,
+                                     int offset, int len);
+
+    /**
      * Send multiple packets of data over a network.
      * <PRE>
      * This functions acts like a blocking write by default.  To change

Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java	2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java	2007-08-04 17:00:23 UTC (rev 213)
@@ -1646,9 +1646,12 @@
                             for (int n = 0; n < rv; n++) {
                                 timeouts.remove(desc[n*2+1]);
                                 // Check for failed sockets and hand this socket off to a worker
+                                // FIXME: Need to check for a write
                                 if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
                                         || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
-                                        || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ))) 
+                                        || (comet && 
+                                                (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && !processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
+                                                || (((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE))) 
                                         || (!comet && (!processSocket(desc[n*2+1])))) {
                                     // Close socket and clear pool
                                     if (comet) {




More information about the jbossweb-commits mailing list