Author: remy.maucherat(a)jboss.com
Date: 2008-10-31 00:48:24 -0400 (Fri, 31 Oct 2008)
New Revision: 833
Modified:
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Small tweaks.
- Rewrite the main if as a more conventional if/then/else (in one case with Comet, it is
possible it could have caused problems).
- Fix remove from poller return value.
- Add a better poller reset code, not really used right now (I'm sticking with destroy
for POLLHUP, as Poll.pollset returns
apparently funny results; totally handy when debugging as well).
- Fix all the comet calls for processSocket (false means that it was not possible to
process, which should almost never
happen).
- The bayeux test looks really solid now, so I suppose this means JBWEB-124 is now
accidentally ok.
- Need to test timeouts again.
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2008-10-31 04:29:20 UTC (rev
832)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2008-10-31 04:48:24 UTC (rev
833)
@@ -57,7 +57,6 @@
// -------------------------------------------------------------- Constants
-
protected static Logger log = Logger.getLogger(AprEndpoint.class);
protected static StringManager sm =
@@ -1448,9 +1447,7 @@
// Close all sockets in the add queue
SocketInfo info = addList.get();
while (info != null) {
- if (comet) {
- processSocket(info.socket, SocketStatus.STOP);
- } else {
+ if (!comet || (comet && !processSocket(info.socket,
SocketStatus.STOP))) {
Socket.destroy(info.socket);
}
info = addList.get();
@@ -1461,9 +1458,7 @@
int rv = Poll.pollset(pollers[i], desc);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.STOP);
- } else {
+ if (!comet || (comet && !processSocket(desc[n*2+1],
SocketStatus.STOP))) {
Socket.destroy(desc[n*2+1]);
}
}
@@ -1497,9 +1492,7 @@
}
if (!ok) {
// Can't do anything: close the socket right away
- if (comet) {
- processSocket(socket, SocketStatus.ERROR);
- } else {
+ if (!comet || (comet && !processSocket(socket,
SocketStatus.ERROR))) {
Socket.destroy(socket);
}
}
@@ -1536,9 +1529,7 @@
}
if (!ok) {
// Can't do anything: close the socket right away
- if (comet) {
- processSocket(socket, SocketStatus.ERROR);
- } else {
+ if (!comet || (comet && !processSocket(socket,
SocketStatus.ERROR))) {
Socket.destroy(socket);
}
}
@@ -1548,7 +1539,7 @@
* Add specified socket to one of the pollers.
*/
protected boolean addToPoller(long socket, int events) {
- int rv = 0;
+ int rv = -1;
for (int i = 0; i < pollers.length; i++) {
if (pollerSpace[i] > 0) {
rv = Poll.add(pollers[i], socket, events);
@@ -1566,7 +1557,7 @@
* Remove specified socket from the pollers.
*/
protected boolean removeFromPoller(long socket) {
- int rv = 0;
+ int rv = -1;
for (int i = 0; i < pollers.length; i++) {
if (pollerSpace[i] < actualPollerSize) {
rv = Poll.remove(pollers[i], socket);
@@ -1589,9 +1580,7 @@
long socket = timeouts.check(date);
while (socket != 0) {
removeFromPoller(socket);
- if (comet) {
- processSocket(socket, SocketStatus.TIMEOUT);
- } else {
+ if (!comet || (comet && !processSocket(socket,
SocketStatus.TIMEOUT))) {
Socket.destroy(socket);
}
socket = timeouts.check(date);
@@ -1600,6 +1589,24 @@
}
/**
+ * Displays the list of sockets in the pollers.
+ */
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Poller comet=[").append(comet).append("]");
+ long[] res = new long[actualPollerSize * 2];
+ for (int i = 0; i < pollers.length; i++) {
+ int count = Poll.pollset(pollers[i], res);
+ buf.append(" [ ");
+ for (int j = 0; j < count; j++) {
+ buf.append(desc[2*j+1]).append(" ");
+ }
+ buf.append("]");
+ }
+ return buf.toString();
+ }
+
+ /**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@@ -1661,9 +1668,7 @@
| ((info.write()) ? Poll.APR_POLLOUT : 0);
if (!addToPoller(info.socket, events)) {
// Can't do anything: close the socket right
away
- if (comet) {
- processSocket(info.socket,
SocketStatus.ERROR);
- } else {
+ if (!comet || (comet &&
!processSocket(info.socket, SocketStatus.ERROR))) {
Socket.destroy(info.socket);
}
}
@@ -1682,7 +1687,7 @@
}
} else {
// Should never happen, if not Comet, the socket is
always put in
- // the list with the read flag.
+ // the list with the read flag.
Socket.destroy(info.socket);
}
}
@@ -1694,6 +1699,11 @@
// Poll for the specified interval
for (int i = 0; i < pollers.length; i++) {
+
+ // Flags to ask to reallocate the pool
+ boolean reset = false;
+ ArrayList<Long> skip = null;
+
int rv = 0;
// Iterate on each pollers, but no need to poll empty pollers
if (pollerSpace[i] < actualPollerSize) {
@@ -1705,22 +1715,52 @@
for (int n = 0; n < rv; n++) {
timeouts.remove(desc[n*2+1]);
// Check for failed sockets and hand this socket off to a
worker
- if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
- || ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
- // Comet processes either a read or a write
depending on what the poller returns
- || (comet &&
- (((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN)
- &&
!processSocket(desc[n*2+1], SocketStatus.OPEN_READ))
- || (((desc[n*2] & Poll.APR_POLLOUT)
== Poll.APR_POLLOUT)
- &&
!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)))
- || (!comet &&
!processSocket(desc[n*2+1]))) {
+ if (comet) {
+ // Comet processes either a read or a write depending
on what the poller returns
+ if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)) {
+ if (!processSocket(desc[n*2+1],
SocketStatus.ERROR)) {
+ // Close socket and clear pool
+ Socket.destroy(desc[n*2+1]);
+ }
+ // FIXME: decide vs destroy
+ /*
+ if ((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP) {
+ // Destroy and reallocate the poller
+ reset = true;
+ if (skip == null) {
+ skip = new ArrayList<Long>();
+ }
+ skip.add(desc[n*2+1]);
+ }*/
+ } else if ((desc[n*2] & Poll.APR_POLLIN) ==
Poll.APR_POLLIN) {
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)) {
+ // Close socket and clear pool
+ Socket.destroy(desc[n*2+1]);
+ }
+ } else if ((desc[n*2] & Poll.APR_POLLOUT) ==
Poll.APR_POLLOUT) {
+ if (!processSocket(desc[n*2+1],
SocketStatus.OPEN_WRITE)) {
+ // Close socket and clear pool
+ Socket.destroy(desc[n*2+1]);
+ }
+ }
+ } else if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)) {
// Close socket and clear pool
- if (comet) {
- processSocket(desc[n*2+1],
SocketStatus.DISCONNECT);
- } else {
- Socket.destroy(desc[n*2+1]);
- }
- continue;
+ Socket.destroy(desc[n*2+1]);
+ // FIXME: decide vs destroy
+ /*
+ if ((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP) {
+ // Destroy and reallocate the poller
+ reset = true;
+ if (skip == null) {
+ skip = new ArrayList<Long>();
+ }
+ skip.add(desc[n*2+1]);
+ }*/
+ } else if (!processSocket(desc[n*2+1])) {
+ // Close socket and clear pool
+ Socket.destroy(desc[n*2+1]);
}
}
} else if (rv < 0) {
@@ -1731,14 +1771,30 @@
errn -= Status.APR_OS_START_USERERR;
}
log.error(sm.getString("endpoint.poll.fail",
"" + errn, Error.strerror(errn)));
- // Handle poll critical failure
- synchronized (this) {
- destroy();
- init();
+ // Destroy and reallocate the poller
+ reset = true;
+ }
+ }
+
+ if (reset) {
+ // Reallocate the current poller
+ int count = Poll.pollset(pollers[i], desc);
+ long newPoller = allocatePoller(actualPollerSize, pool, -1);
+ for (int j = 0; j < count; j++) {
+ int events = (int) desc[2*j];
+ long socket = desc[2*j+1];
+ Poll.remove(pollers[i], socket);
+ if (skip != null && skip.contains(socket)) {
+ continue;
}
- continue;
+ if (Poll.add(newPoller, socket, events) !=
Status.APR_SUCCESS) {
+ // Skip
+ }
}
+ Poll.destroy(pollers[i]);
+ pollers[i] = newPoller;
}
+
}
// Process socket timeouts
@@ -1910,10 +1966,8 @@
if (serverSockPool != 0) {
Socket.destroy(socket);
}
- socket = 0;
}
} else {
-
// Process the request from this socket
if ((status != null) && (handler.event(socket, status) ==
Handler.SocketState.CLOSED)) {
// Close socket and pool only if it wasn't closed
@@ -1921,7 +1975,6 @@
if (serverSockPool != 0) {
Socket.destroy(socket);
}
- socket = 0;
} else if ((status == null) && ((options &&
!setSocketOptions(socket))
|| handler.process(socket) == Handler.SocketState.CLOSED)) {
// Close socket and pool only if it wasn't closed
@@ -1929,7 +1982,6 @@
if (serverSockPool != 0) {
Socket.destroy(socket);
}
- socket = 0;
}
}
@@ -2359,7 +2411,6 @@
} else {
// Close socket and pool
Socket.destroy(socket);
- socket = 0;
}
} else {
// Process the request from this socket
@@ -2367,9 +2418,9 @@
|| handler.process(socket) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
- socket = 0;
}
}
+ socket = 0;
}
@@ -2397,8 +2448,8 @@
if (handler.process(socket) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
- socket = 0;
}
+ socket = 0;
}
@@ -2428,8 +2479,8 @@
if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
// Close socket and pool
Socket.destroy(socket);
- socket = 0;
}
+ socket = 0;
}