JBossWeb SVN: r213 - in trunk/java/org/apache: coyote/http11 and 2 other directories.
by jbossweb-commits@lists.jboss.org
Author: remy.maucherat(a)jboss.com
Date: 2007-08-04 13:00:23 -0400 (Sat, 04 Aug 2007)
New Revision: 213
Modified:
trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
trunk/java/org/apache/tomcat/jni/Socket.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Finish coding the new Comet API.
- Some FIXMEs remain for cases I'm not sure about (like doing sync non blocking writes),
and testing will be very complex.
- Modify Comet begin and end timing (it was also possible that end was never called).
Modified: trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
===================================================================
--- trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -202,9 +202,10 @@
error = true;
connector.getContainer().getPipeline().getFirst().event(request, response, request.getEvent());
}
- if (response.isClosed() || !request.isComet()) {
+ /*if (response.isClosed() || !request.isComet()) {
res.action(ActionCode.ACTION_COMET_END, null);
- } else if (!error && read && request.isReadable()) {
+ } else*/
+ if (!error && read && request.isReadable()) {
// If this was a read and not all bytes have been read, or if no data
// was read from the connector, then it is an error
log.error(sm.getString("coyoteAdapter.read"));
@@ -224,6 +225,7 @@
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
if (error || response.isClosed() || !request.isComet()) {
+ res.action(ActionCode.ACTION_COMET_END, null);
request.recycle();
request.setFilterChain(null);
response.recycle();
@@ -285,15 +287,14 @@
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
+ res.action(ActionCode.ACTION_COMET_BEGIN, null);
if (request.isReadable()) {
// Invoke a read event right away if there are available bytes
if (event(req, res, SocketStatus.OPEN_READ)) {
comet = true;
- res.action(ActionCode.ACTION_COMET_BEGIN, null);
}
} else {
comet = true;
- res.action(ActionCode.ACTION_COMET_BEGIN, null);
}
} else {
// Clear the filter chain, as otherwise it will not be reset elsewhere
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -111,6 +111,12 @@
// ----------------------------------------------------- Instance Variables
+
+ /**
+ * Thread local marker.
+ */
+ public static ThreadLocal<Boolean> containerThread = new ThreadLocal<Boolean>();
+
/**
* Associated adapter.
@@ -693,7 +699,6 @@
*/
public void setSocketBuffer(int socketBuffer) {
this.socketBuffer = socketBuffer;
- outputBuffer.setSocketBuffer(socketBuffer);
}
/**
@@ -754,8 +759,14 @@
throws IOException {
RequestInfo rp = request.getRequestProcessor();
-
try {
+ // If processing a write event, must flush any leftover bytes first
+ if (status == SocketStatus.OPEN_WRITE) {
+ // FIXME: If the flush does not manage to flush all leftover bytes, it is possible
+ // that the servlet is not going to be able to write bytes. This will be handled properly,
+ // but is wasteful.
+ outputBuffer.flushLeftover();
+ }
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
error = !adapter.event(request, response, status);
} catch (InterruptedIOException e) {
@@ -1217,14 +1228,24 @@
request.setAvailable(inputBuffer.available());
} else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
comet = true;
+ // Set socket to non blocking mode
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+ containerThread.set(Boolean.TRUE);
+ outputBuffer.setNonBlocking(true);
} else if (actionCode == ActionCode.ACTION_COMET_END) {
comet = false;
+ // End non blocking mode
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ containerThread.set(null);
+ outputBuffer.setNonBlocking(false);
} else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
readNotifications = false;
} else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
readNotifications = true;
endpoint.getCometPoller().add(socket, timeout, false, false, true);
} else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
+ // FIXME: Maybe, should check (?)
+ // FIXME: If called synchronously, setting a flag instead of a direct call is needed.
endpoint.getCometPoller().add(socket, timeout, false, true, false);
} else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
cometTimeout = ((Integer) param).intValue();
Modified: trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
===================================================================
--- trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -79,6 +79,9 @@
committed = false;
finished = false;
+ leftover = new ByteChunk();
+ nonBlocking = false;
+
// Cause loading of HttpMessages
HttpMessages.getMessage(200);
@@ -176,6 +179,12 @@
*/
protected ByteChunk leftover = null;
+
+ /**
+ * Non blocking mode.
+ */
+ protected boolean nonBlocking = false;
+
// ------------------------------------------------------------- Properties
@@ -198,14 +207,22 @@
/**
- * Set the socket buffer size.
+ * Set the non blocking flag.
*/
- public void setSocketBuffer(int socketBufferSize) {
- // FIXME: Remove
+ public void setNonBlocking(boolean nonBlocking) {
+ this.nonBlocking = nonBlocking;
}
/**
+ * Get the non blocking flag value.
+ */
+ public boolean getNonBlocking() {
+ return nonBlocking;
+ }
+
+
+ /**
* Add an output filter to the filter library.
*/
public void addFilter(OutputFilter filter) {
@@ -344,10 +361,12 @@
}
// Reset pointers
+ leftover.recycle();
pos = 0;
lastActiveFilter = -1;
committed = false;
finished = false;
+ nonBlocking = false;
}
@@ -687,6 +706,47 @@
}
+
+ /**
+ * Flush leftover bytes.
+ *
+ * @return true if all leftover bytes have been flushed
+ */
+ public boolean flushLeftover()
+ throws IOException {
+ if (leftover.getLength() > 0) {
+ int len = leftover.getLength();
+ int start = leftover.getStart();
+ byte[] b = leftover.getBuffer();
+ while (len > 0) {
+ int thisTime = len;
+ if (bbuf.position() == bbuf.capacity()) {
+ int pos = 0;
+ int end = bbuf.position();
+ int res = Socket.sendibb(socket, 0, bbuf.position());
+ while (res > 0 && pos < end) {
+ pos += res;
+ res = Socket.sendibb(socket, pos, bbuf.position());
+ }
+ if (res < 0) {
+ throw new IOException("Error");
+ }
+ if (pos < end) {
+ // Could not write all leftover data: put back to write poller
+ return false;
+ }
+ }
+ if (thisTime > bbuf.capacity() - bbuf.position()) {
+ thisTime = bbuf.capacity() - bbuf.position();
+ }
+ bbuf.put(b, start, thisTime);
+ len = len - thisTime;
+ start = start + thisTime;
+ }
+ }
+ return true;
+ }
+
/**
* Callback to write data from the buffer.
@@ -694,15 +754,49 @@
protected void flushBuffer()
throws IOException {
if (bbuf.position() > 0) {
- int res = Socket.sendbb(socket, 0, bbuf.position());
+ int res = 0;
+ if (nonBlocking) {
+ // If there are still leftover bytes here, there's a problem:
+ // - If the call is asynchronous, throw an exception
+ // - If the call is synchronous, make a regular blocking write to flush the data
+ if (leftover.getLength() > 0) {
+ if (Http11AprProcessor.containerThread.get() == Boolean.TRUE) {
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 0);
+ // Send leftover bytes
+ res = Socket.send(socket, leftover.getBuffer(), leftover.getOffset(), leftover.getEnd());
+ // Send current buffer
+ if (res > 0) {
+ res = Socket.sendbb(socket, 0, bbuf.position());
+ }
+ Socket.optSet(socket, Socket.APR_SO_NONBLOCK, 1);
+ } else {
+ throw new IOException("Backlog");
+ }
+ } else {
+ // Perform non blocking writes until all data is written, or the result
+ // of the write is 0
+ int pos = 0;
+ int end = bbuf.position();
+ res = Socket.sendibb(socket, 0, bbuf.position());
+ while (res > 0 && pos < end) {
+ pos += res;
+ res = Socket.sendibb(socket, pos, bbuf.position());
+ }
+ // Put any leftover bytes in the leftover byte chunk
+ if (pos < end) {
+ leftover.allocate(end - pos, -1);
+ bbuf.get(leftover.getBuffer(), 0, end - pos);
+ leftover.setEnd(end-pos);
+ }
+ }
+ } else {
+ res = Socket.sendbb(socket, 0, bbuf.position());
+ }
response.setLastWrite(res);
- // FIXME: If in Comet mode and non blocking, and if it did not write the whole thing,
- // then retry until it writes 0 bytes (?), and put the leftover bytes in the leftover
- // byte chunk
+ bbuf.clear();
if (res < 0) {
- throw new IOException();
+ throw new IOException("Error");
}
- bbuf.clear();
}
}
@@ -731,6 +825,9 @@
int thisTime = len;
if (bbuf.position() == bbuf.capacity()) {
flushBuffer();
+ // FIXME: If non blocking (comet) and there are leftover bytes,
+ // put all remaining bytes in the leftover buffer
+
}
if (thisTime > bbuf.capacity() - bbuf.position()) {
thisTime = bbuf.capacity() - bbuf.position();
Modified: trunk/java/org/apache/tomcat/jni/Socket.java
===================================================================
--- trunk/java/org/apache/tomcat/jni/Socket.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/tomcat/jni/Socket.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -241,13 +241,44 @@
*/
public static native int sendb(long sock, ByteBuffer buf,
int offset, int len);
+
/**
+ * Send data over a network without retry
+ * <PRE>
+ * This functions acts like a blocking write by default. To change
+ * this behavior, use apr_socket_timeout_set() or the APR_SO_NONBLOCK
+ * socket option.
+ *
+ * It is possible for both bytes to be sent and an error to be returned.
+ *
+ * </PRE>
+ * @param sock The socket to send the data over.
+ * @param buf The Byte buffer which contains the data to be sent.
+ * @param offset The offset within the buffer array of the first buffer from
+ * which bytes are to be retrieved; must be non-negative
+ * and no larger than buf.length
+ * @param len The maximum number of buffers to be accessed; must be non-negative
+ * and no larger than buf.length - offset
+ * @return The number of bytes send.
+ *
+ */
+ public static native int sendib(long sock, ByteBuffer buf,
+ int offset, int len);
+
+ /**
* Send data over a network using internally set ByteBuffer
*/
public static native int sendbb(long sock,
- int offset, int len);
+ int offset, int len);
/**
+ * Send data over a network using internally set ByteBuffer
+ * without internal retry.
+ */
+ public static native int sendibb(long sock,
+ int offset, int len);
+
+ /**
* Send multiple packets of data over a network.
* <PRE>
* This functions acts like a blocking write by default. To change
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-08-04 16:57:43 UTC (rev 212)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-08-04 17:00:23 UTC (rev 213)
@@ -1646,9 +1646,12 @@
for (int n = 0; n < rv; n++) {
timeouts.remove(desc[n*2+1]);
// Check for failed sockets and hand this socket off to a worker
+ // FIXME: Need to check for a write
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)))
+ || (comet &&
+ (((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) && !processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
+ || (((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) && !processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)))
|| (!comet && (!processSocket(desc[n*2+1])))) {
// Close socket and clear pool
if (comet) {
17 years, 4 months
JBossWeb SVN: r212 - in trunk/native/connector: os/win32 and 1 other directories.
by jbossweb-commits@lists.jboss.org
Author: remy.maucherat(a)jboss.com
Date: 2007-08-04 12:57:43 -0400 (Sat, 04 Aug 2007)
New Revision: 212
Modified:
trunk/native/connector/include/tcn_version.h
trunk/native/connector/os/win32/libtcnative.rc
trunk/native/connector/src/network.c
Log:
- Native code update: add methods which look like the original write methods.
Modified: trunk/native/connector/include/tcn_version.h
===================================================================
--- trunk/native/connector/include/tcn_version.h 2007-07-31 12:56:33 UTC (rev 211)
+++ trunk/native/connector/include/tcn_version.h 2007-08-04 16:57:43 UTC (rev 212)
@@ -17,7 +17,7 @@
/*
*
* @author Mladen Turk
- * @version $Revision: 525206 $, $Date: 2007-04-03 18:42:27 +0200 (mar., 03 avr. 2007) $
+ * @version $Revision: 562478 $, $Date: 2007-08-03 16:33:51 +0200 (ven., 03 août 2007) $
*/
#ifndef TCN_VERSION_H
@@ -69,7 +69,7 @@
#define TCN_MINOR_VERSION 1
/** patch level */
-#define TCN_PATCH_VERSION 11
+#define TCN_PATCH_VERSION 12
/**
* This symbol is defined for internal, "development" copies of TCN. This
Modified: trunk/native/connector/os/win32/libtcnative.rc
===================================================================
--- trunk/native/connector/os/win32/libtcnative.rc 2007-07-31 12:56:33 UTC (rev 211)
+++ trunk/native/connector/os/win32/libtcnative.rc 2007-08-04 16:57:43 UTC (rev 212)
@@ -3,7 +3,7 @@
LANGUAGE 0x9,0x1
1 11 logmessages.bin
-#define TCN_COPYRIGHT "Copyright 2000-2006 The Apache Software " \
+#define TCN_COPYRIGHT "Copyright 2000-2007 The Apache Software " \
"Foundation or its licensors, as applicable."
#define TCN_LICENSE "Licensed under the Apache License, Version 2.0 " \
@@ -19,7 +19,7 @@
"specific language governing permissions and " \
"limitations under the License."
-#define TCN_VERISON "1.1.11"
+#define TCN_VERISON "1.1.12"
1000 ICON "apache.ico"
1001 DIALOGEX 0, 0, 252, 51
@@ -35,8 +35,8 @@
END
1 VERSIONINFO
- FILEVERSION 1,1,11,0
- PRODUCTVERSION 1,1,11,0
+ FILEVERSION 1,1,12,0
+ PRODUCTVERSION 1,1,12,0
FILEFLAGSMASK 0x3fL
#ifdef _DEBUG
FILEFLAGS 0x1L
Modified: trunk/native/connector/src/network.c
===================================================================
--- trunk/native/connector/src/network.c 2007-07-31 12:56:33 UTC (rev 211)
+++ trunk/native/connector/src/network.c 2007-08-04 16:57:43 UTC (rev 212)
@@ -17,7 +17,7 @@
/*
*
* @author Mladen Turk
- * @version $Revision: 523024 $, $Date: 2007-03-27 20:23:06 +0200 (mar., 27 mars 2007) $
+ * @version $Revision: 562478 $, $Date: 2007-08-03 16:33:51 +0200 (ven., 03 août 2007) $
*/
#include "tcn.h"
@@ -540,6 +540,40 @@
}
}
+TCN_IMPLEMENT_CALL(jint, Socket, sendib)(TCN_STDARGS, jlong sock,
+ jobject buf, jint offset, jint len)
+{
+ tcn_socket_t *s = J2P(sock, tcn_socket_t *);
+ apr_size_t nbytes = (apr_size_t)len;
+ char *bytes;
+ apr_status_t ss = APR_SUCCESS;
+
+ UNREFERENCED(o);
+ if (!sock) {
+ tcn_ThrowAPRException(e, APR_ENOTSOCK);
+ return -(jint)APR_ENOTSOCK;
+ }
+ TCN_ASSERT(s->opaque != NULL);
+ TCN_ASSERT(buf != NULL);
+#ifdef TCN_DO_STATISTICS
+ sp_max_send = TCN_MAX(sp_max_send, nbytes);
+ sp_min_send = TCN_MIN(sp_min_send, nbytes);
+ sp_tot_send += nbytes;
+ sp_num_send++;
+#endif
+
+ bytes = (char *)(*e)->GetDirectBufferAddress(e, buf);
+
+ ss = (*s->net->send)(s->opaque, bytes + offset, &nbytes);
+
+ if (ss == APR_SUCCESS)
+ return (jint)nbytes;
+ else {
+ TCN_ERROR_WRAP(ss);
+ return -(jint)ss;
+ }
+}
+
TCN_IMPLEMENT_CALL(jint, Socket, sendbb)(TCN_STDARGS, jlong sock,
jint offset, jint len)
{
@@ -577,6 +611,37 @@
}
}
+TCN_IMPLEMENT_CALL(jint, Socket, sendibb)(TCN_STDARGS, jlong sock,
+ jint offset, jint len)
+{
+ tcn_socket_t *s = J2P(sock, tcn_socket_t *);
+ apr_size_t nbytes = (apr_size_t)len;
+ apr_status_t ss = APR_SUCCESS;
+
+ UNREFERENCED(o);
+ if (!sock) {
+ tcn_ThrowAPRException(e, APR_ENOTSOCK);
+ return -(jint)APR_ENOTSOCK;
+ }
+ TCN_ASSERT(s->opaque != NULL);
+ TCN_ASSERT(s->jsbbuff != NULL);
+#ifdef TCN_DO_STATISTICS
+ sp_max_send = TCN_MAX(sp_max_send, nbytes);
+ sp_min_send = TCN_MIN(sp_min_send, nbytes);
+ sp_tot_send += nbytes;
+ sp_num_send++;
+#endif
+
+ ss = (*s->net->send)(s->opaque, s->jsbbuff + offset, &nbytes);
+
+ if (ss == APR_SUCCESS)
+ return (jint)nbytes;
+ else {
+ TCN_ERROR_WRAP(ss);
+ return -(jint)nbytes;
+ }
+}
+
TCN_IMPLEMENT_CALL(jint, Socket, sendv)(TCN_STDARGS, jlong sock,
jobjectArray bufs)
{
17 years, 4 months