Author: remy.maucherat(a)jboss.com
Date: 2007-08-04 13:00:23 -0400 (Sat, 04 Aug 2007)
New Revision: 213
Modified:
trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
trunk/java/org/apache/tomcat/jni/Socket.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Finish coding the new Comet API.
- Some FIXMEs remain for cases I'm not sure about (like doing sync non blocking
writes),
and testing will be very complex.
- Modify Comet begin and end timing (it was also possible that end was never called).
Modified: trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
===================================================================
--- trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2007-08-04 16:57:43 UTC
(rev 212)
+++ trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2007-08-04 17:00:23 UTC
(rev 213)
@@ -202,9 +202,10 @@
error = true;
connector.getContainer().getPipeline().getFirst().event(request,
response, request.getEvent());
}
- if (response.isClosed() || !request.isComet()) {
+ /*if (response.isClosed() || !request.isComet()) {
res.action(ActionCode.ACTION_COMET_END, null);
- } else if (!error && read && request.isReadable()) {
+ } else*/
+ if (!error && read && request.isReadable()) {
// If this was a read and not all bytes have been read, or if no
data
// was read from the connector, then it is an error
log.error(sm.getString("coyoteAdapter.read"));
@@ -224,6 +225,7 @@
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
if (error || response.isClosed() || !request.isComet()) {
+ res.action(ActionCode.ACTION_COMET_END, null);
request.recycle();
request.setFilterChain(null);
response.recycle();
@@ -285,15 +287,14 @@
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
+ res.action(ActionCode.ACTION_COMET_BEGIN, null);
if (request.isReadable()) {
// Invoke a read event right away if there are available
bytes
if (event(req, res, SocketStatus.OPEN_READ)) {
comet = true;
- res.action(ActionCode.ACTION_COMET_BEGIN, null);
}
} else {
comet = true;
- res.action(ActionCode.ACTION_COMET_BEGIN, null);
}
} else {
// Clear the filter chain, as otherwise it will not be reset
elsewhere
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-08-04 16:57:43 UTC
(rev 212)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-08-04 17:00:23 UTC
(rev 213)
@@ -111,6 +111,12 @@
// ----------------------------------------------------- Instance Variables
+
+ /**
+ * Thread local marker.
+ */
+ public static ThreadLocal<Boolean> containerThread = new
ThreadLocal<Boolean>();
+
/**
* Associated adapter.
@@ -693,7 +699,6 @@
*/
public void setSocketBuffer(int socketBuffer) {
this.socketBuffer = socketBuffer;
- outputBuffer.setSocketBuffer(socketBuffer);
}
/**
@@ -754,8 +759,14 @@
throws IOException {
RequestInfo rp = request.getRequestProcessor();
-
try {
+ // If processing a write event, must flush any leftover bytes first
+ if (status == SocketStatus.OPEN_WRITE) {
+ // FIXME: If the flush does not manage to flush all leftover bytes, it is
possible
+ // that the servlet is not going to be able to write bytes. This will be
handled properly,
+ // but is wasteful.
+ outputBuffer.flushLeftover();
+ }
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
error = !adapter.event(request, response, status);
} catch (InterruptedIOException e) {
@@ -1217,14 +1228,24 @@
request.setAvailable(inputBuffer.available());
} else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
comet = true;
+ // Set socket to non blocking mode
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+ containerThread.set(Boolean.TRUE);
+ outputBuffer.setNonBlocking(true);
} else if (actionCode == ActionCode.ACTION_COMET_END) {
comet = false;
+ // End non blocking mode
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ containerThread.set(null);
+ outputBuffer.setNonBlocking(false);
} else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
readNotifications = false;
} else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
readNotifications = true;
endpoint.getCometPoller().add(socket, timeout, false, false, true);
} else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
+ // FIXME: Maybe, should check (?)
+ // FIXME: If called synchronously, setting a flag instead of a direct call is
needed.
endpoint.getCometPoller().add(socket, timeout, false, true, false);
} else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
cometTimeout = ((Integer) param).intValue();
Modified: trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
===================================================================
--- trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2007-08-04 16:57:43
UTC (rev 212)
+++ trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2007-08-04 17:00:23
UTC (rev 213)
@@ -79,6 +79,9 @@
committed = false;
finished = false;
+ leftover = new ByteChunk();
+ nonBlocking = false;
+
// Cause loading of HttpMessages
HttpMessages.getMessage(200);
@@ -176,6 +179,12 @@
*/
protected ByteChunk leftover = null;
+
+ /**
+ * Non blocking mode.
+ */
+ protected boolean nonBlocking = false;
+
// ------------------------------------------------------------- Properties
@@ -198,14 +207,22 @@
/**
- * Set the socket buffer size.
+ * Set the non blocking flag.
*/
- public void setSocketBuffer(int socketBufferSize) {
- // FIXME: Remove
+ public void setNonBlocking(boolean nonBlocking) {
+ this.nonBlocking = nonBlocking;
}
/**
+ * Get the non blocking flag value.
+ */
+ public boolean getNonBlocking() {
+ return nonBlocking;
+ }
+
+
+ /**
* Add an output filter to the filter library.
*/
public void addFilter(OutputFilter filter) {
@@ -344,10 +361,12 @@
}
// Reset pointers
+ leftover.recycle();
pos = 0;
lastActiveFilter = -1;
committed = false;
finished = false;
+ nonBlocking = false;
}
@@ -687,6 +706,47 @@
}
+
+ /**
+ * Flush leftover bytes.
+ *
+ * @return true if all leftover bytes have been flushed
+ */
+ public boolean flushLeftover()
+ throws IOException {
+ if (leftover.getLength() > 0) {
+ int len = leftover.getLength();
+ int start = leftover.getStart();
+ byte[] b = leftover.getBuffer();
+ while (len > 0) {
+ int thisTime = len;
+ if (bbuf.position() == bbuf.capacity()) {
+ int pos = 0;
+ int end = bbuf.position();
+ int res = Socket.sendibb(socket, 0, bbuf.position());
+ while (res > 0 && pos < end) {
+ pos += res;
+ res = Socket.sendibb(socket, pos, bbuf.position());
+ }
+ if (res < 0) {
+ throw new IOException("Error");
+ }
+ if (pos < end) {
+ // Could not write all leftover data: put back to write poller
+ return false;
+ }
+ }
+ if (thisTime > bbuf.capacity() - bbuf.position()) {
+ thisTime = bbuf.capacity() - bbuf.position();
+ }
+ bbuf.put(b, start, thisTime);
+ len = len - thisTime;
+ start = start + thisTime;
+ }
+ }
+ return true;
+ }
+
/**
* Callback to write data from the buffer.
@@ -694,15 +754,49 @@
protected void flushBuffer()
throws IOException {
if (bbuf.position() > 0) {
- int res = Socket.sendbb(socket, 0, bbuf.position());
+ int res = 0;
+ if (nonBlocking) {
+ // If there are still leftover bytes here, there's a problem:
+ // - If the call is asynchronous, throw an exception
+ // - If the call is synchronous, make a regular blocking write to flush
the data
+ if (leftover.getLength() > 0) {
+ if (Http11AprProcessor.containerThread.get() == Boolean.TRUE) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ // Send leftover bytes
+ res = Socket.send(socket, leftover.getBuffer(),
leftover.getOffset(), leftover.getEnd());
+ // Send current buffer
+ if (res > 0) {
+ res = Socket.sendbb(socket, 0, bbuf.position());
+ }
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+ } else {
+ throw new IOException("Backlog");
+ }
+ } else {
+ // Perform non blocking writes until all data is written, or the
result
+ // of the write is 0
+ int pos = 0;
+ int end = bbuf.position();
+ res = Socket.sendibb(socket, 0, bbuf.position());
+ while (res > 0 && pos < end) {
+ pos += res;
+ res = Socket.sendibb(socket, pos, bbuf.position());
+ }
+ // Put any leftover bytes in the leftover byte chunk
+ if (pos < end) {
+ leftover.allocate(end - pos, -1);
+ bbuf.get(leftover.getBuffer(), 0, end - pos);
+ leftover.setEnd(end-pos);
+ }
+ }
+ } else {
+ res = Socket.sendbb(socket, 0, bbuf.position());
+ }
response.setLastWrite(res);
- // FIXME: If in Comet mode and non blocking, and if it did not write the
whole thing,
- // then retry until it writes 0 bytes (?), and put the leftover bytes in the
leftover
- // byte chunk
+ bbuf.clear();
if (res < 0) {
- throw new IOException();
+ throw new IOException("Error");
}
- bbuf.clear();
}
}
@@ -731,6 +825,9 @@
int thisTime = len;
if (bbuf.position() == bbuf.capacity()) {
flushBuffer();
+ // FIXME: If non blocking (comet) and there are leftover bytes,
+ // put all remaining bytes in the leftover buffer
+
}
if (thisTime > bbuf.capacity() - bbuf.position()) {
thisTime = bbuf.capacity() - bbuf.position();
Modified: trunk/java/org/apache/tomcat/jni/Socket.java
===================================================================
--- trunk/java/org/apache/tomcat/jni/Socket.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/tomcat/jni/Socket.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -241,13 +241,44 @@
*/
public static native int sendb(long sock, ByteBuffer buf,
int offset, int len);
+
/**
+ * Send data over a network without retry
+ * <PRE>
+ * This functions acts like a blocking write by default. To change
+ * this behavior, use apr_socket_timeout_set() or the APR_SO_NONBLOCK
+ * socket option.
+ *
+ * It is possible for both bytes to be sent and an error to be returned.
+ *
+ * </PRE>
+ * @param sock The socket to send the data over.
+ * @param buf The Byte buffer which contains the data to be sent.
+ * @param offset The offset within the buffer array of the first buffer from
+ * which bytes are to be retrieved; must be non-negative
+ * and no larger than buf.length
+ * @param len The maximum number of buffers to be accessed; must be non-negative
+ * and no larger than buf.length - offset
+ * @return The number of bytes send.
+ *
+ */
+ public static native int sendib(long sock, ByteBuffer buf,
+ int offset, int len);
+
+ /**
* Send data over a network using internally set ByteBuffer
*/
public static native int sendbb(long sock,
- int offset, int len);
+ int offset, int len);
/**
+ * Send data over a network using internally set ByteBuffer
+ * without internal retry.
+ */
+ public static native int sendibb(long sock,
+ int offset, int len);
+
+ /**
* Send multiple packets of data over a network.
* <PRE>
* This functions acts like a blocking write by default. To change
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-08-04 16:57:43 UTC (rev
212)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-08-04 17:00:23 UTC (rev
213)
@@ -1646,9 +1646,12 @@
for (int n = 0; n < rv; n++) {
timeouts.remove(desc[n*2+1]);
// Check for failed sockets and hand this socket off to a
worker
+ // FIXME: Need to check for a write
if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)))
+ || (comet &&
+ (((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) && !processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
+ || (((desc[n*2] & Poll.APR_POLLOUT)
== Poll.APR_POLLOUT) && !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)))
|| (!comet &&
(!processSocket(desc[n*2+1])))) {
// Close socket and clear pool
if (comet) {