Author: remy.maucherat(a)jboss.com
Date: 2007-07-30 20:06:49 -0400 (Mon, 30 Jul 2007)
New Revision: 209
Modified:
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Basic bugfixing.
- Support multiple pollers.
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-07-27 08:35:42 UTC (rev
208)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-07-31 00:06:49 UTC (rev
209)
@@ -1225,7 +1225,7 @@
protected SocketInfo info = new SocketInfo();
public SocketList(int size) {
- size = 0;
+ this.size = 0;
pos = 0;
sockets = new long[size];
timeouts = new int[size];
@@ -1234,7 +1234,7 @@
}
public int size() {
- return size;
+ return this.size;
}
public SocketInfo get() {
@@ -1294,6 +1294,11 @@
protected long[] pollers = null;
/**
+ * Actual poller size.
+ */
+ protected int actualPollerSize = 0;
+
+ /**
* Amount of spots left in the poller.
*/
protected int[] pollerSpace = null;
@@ -1357,11 +1362,11 @@
timeouts = new SocketTimeouts(pollerSize);
pool = Pool.create(serverSockPool);
- int size = pollerSize / pollerThreadCount;
- if ((OS.IS_WIN32 || OS.IS_WIN64) && (size > 1024)) {
+ actualPollerSize = pollerSize / pollerThreadCount;
+ if ((OS.IS_WIN32 || OS.IS_WIN64) && (actualPollerSize > 1024)) {
// The maximum per poller to get reasonable performance is 1024
// Adjust poller size so that it won't reach the limit
- size = 1024;
+ actualPollerSize = 1024;
}
int timeout = keepAliveTimeout;
if (timeout < 0) {
@@ -1369,31 +1374,31 @@
}
// FIXME: timeout is useless, look into removing it
- long pollset = allocatePoller(size, pool, timeout);
- if (pollset == 0 && size > 1024) {
- size = 1024;
- pollset = allocatePoller(size, pool, timeout);
+ long pollset = allocatePoller(actualPollerSize, pool, timeout);
+ if (pollset == 0 && actualPollerSize > 1024) {
+ actualPollerSize = 1024;
+ pollset = allocatePoller(actualPollerSize, pool, timeout);
}
if (pollset == 0) {
- size = 62;
- pollset = allocatePoller(size, pool, timeout);
+ actualPollerSize = 62;
+ pollset = allocatePoller(actualPollerSize, pool, timeout);
}
- pollerCount = (pollerSize / pollerThreadCount) / size;
+ pollerCount = (pollerSize / pollerThreadCount) / actualPollerSize;
pollerTime = pollTime / pollerCount;
pollers = new long[pollerCount];
pollers[0] = pollset;
for (int i = 1; i < pollerCount; i++) {
- pollers[i] = allocatePoller(size, pool, timeout);
+ pollers[i] = allocatePoller(actualPollerSize, pool, timeout);
}
pollerSpace = new int[pollerCount];
for (int i = 0; i < pollerCount; i++) {
- pollerSpace[i] = size;
+ pollerSpace[i] = actualPollerSize;
}
- desc = new long[size * 2];
+ desc = new long[actualPollerSize * 2];
keepAliveCount = 0;
addList = new SocketList(pollerSize / pollerThreadCount);
localAddList = new SocketList(pollerSize / pollerThreadCount);
@@ -1471,7 +1476,6 @@
}
}
- // FIXME: Maybe CometPoller extends Poller could make some sense
/**
* Add specified socket and associated pool to the poller. The socket will
* be added to a temporary array, and polled first after a maximum amount
@@ -1504,6 +1508,40 @@
}
/**
+ * Add specified socket to one of the pollers.
+ */
+ protected boolean addToPoller(long socket, int events) {
+ int rv = 0;
+ for (int i = 0; i < pollers.length; i++) {
+ if (pollerSpace[i] > 0) {
+ rv = Poll.add(pollers[i], socket, events);
+ if (rv == Status.APR_SUCCESS) {
+ pollerSpace[i]--;
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Remove specified socket from the pollers.
+ */
+ protected boolean removeFromPoller(long socket) {
+ int rv = 0;
+ for (int i = 0; i < pollers.length; i++) {
+ if (pollerSpace[i] < actualPollerSize) {
+ rv = Poll.remove(pollers[i], socket);
+ if (rv != Status.APR_NOTFOUND) {
+ pollerSpace[i]++;
+ break;
+ }
+ }
+ }
+ return (rv == Status.APR_SUCCESS);
+ }
+
+ /**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@@ -1537,21 +1575,6 @@
// Add sockets which are waiting to the poller
if (addList.size() > 0) {
synchronized (this) {
- /*
- for (int i = (addCount - 1); i >= 0; i--) {
- int rv = Poll.add
- (serverPollset, addS[i], Poll.APR_POLLIN);
- if (rv == Status.APR_SUCCESS) {
- keepAliveCount++;
- } else {
- // Can't do anything: close the socket right
away
- if (comet) {
- processSocket(addS[i], SocketStatus.ERROR);
- } else {
- Socket.destroy(addS[i]);
- }
- }
- }*/
// Duplicate to another list, so that the syncing is minimal
addList.duplicate(localAddList);
addList.clear();
@@ -1560,10 +1583,10 @@
while (info != null) {
if (info.read || info.write) {
// Store timeout
- // FIXME: remove sockets from structure when processing
them
timeouts.add(info.socket, System.currentTimeMillis() +
info.timeout);
- // FIXME: what is the status code if the socket is not
present in that poller
- int rv = Poll.remove(pollers[0], info.socket);
+ if (comet) {
+ removeFromPoller(info.socket);
+ }
// Windows only: check status code and loop over the
other pollers
int events = 0;
if (info.read) {
@@ -1572,16 +1595,19 @@
if (info.write) {
events = events | Poll.APR_POLLOUT;
}
- // FIXME: need to iterate over the pollers array, iterate
over pollers until one is found
- rv = Poll.add(pollers[0], info.socket, events);
- // FIXME: check return value
+ if (!addToPoller(info.socket, events)) {
+ // Can't do anything: close the socket right
away
+ if (comet) {
+ processSocket(info.socket, SocketStatus.ERROR);
+ } else {
+ Socket.destroy(info.socket);
+ }
+ }
} else {
// Resume event
timeouts.remove(info.socket);
- // FIXME: what is the status code if the socket is not
present in that poller
- int rv = Poll.remove(pollers[0], info.socket);
- // Windows only: check status code and loop over the
other pollers
if (comet) {
+ removeFromPoller(info.socket);
processSocket(info.socket,
SocketStatus.OPEN_CALLBACK);
} else {
// Should never happen
@@ -1594,49 +1620,57 @@
}
maintainTime += pollTime;
- // Pool for the specified interval
- // FIXME: need to iterate over the pollers array
- int rv = Poll.poll(pollers[0], pollerTime, desc, true);
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Check for failed sockets and hand this socket off to a
worker
- if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
- || ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
- || (comet && (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)))
- || (!comet && (!processSocket(desc[n*2+1]))))
{
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.DISCONNECT);
- } else {
- Socket.destroy(desc[n*2+1]);
+
+ // Poll for the specified interval
+ for (int i = 0; i < pollers.length; i++) {
+ int rv = 0;
+ // Iterate on each pollers, but no need to poll empty pollers
+ if (pollerSpace[i] < actualPollerSize) {
+ rv = Poll.poll(pollers[i], pollerTime, desc, true);
+ }
+ if (rv > 0) {
+ keepAliveCount -= rv;
+ for (int n = 0; n < rv; n++) {
+ timeouts.remove(desc[n*2+1]);
+ // Check for failed sockets and hand this socket off to a
worker
+ if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
+ || ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
+ || (comet && (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)))
+ || (!comet &&
(!processSocket(desc[n*2+1])))) {
+ // Close socket and clear pool
+ if (comet) {
+ processSocket(desc[n*2+1],
SocketStatus.DISCONNECT);
+ } else {
+ Socket.destroy(desc[n*2+1]);
+ }
+ continue;
}
+ }
+ } else if (rv < 0) {
+ int errn = -rv;
+ /* Any non timeup or interrupted error is critical */
+ if ((errn != Status.TIMEUP) && (errn !=
Status.EINTR)) {
+ if (errn > Status.APR_OS_START_USERERR) {
+ errn -= Status.APR_OS_START_USERERR;
+ }
+ log.error(sm.getString("endpoint.poll.fail",
"" + errn, Error.strerror(errn)));
+ // Handle poll critical failure
+ synchronized (this) {
+ destroy();
+ init();
+ }
continue;
}
}
- } else if (rv < 0) {
- int errn = -rv;
- /* Any non timeup or interrupted error is critical */
- if ((errn != Status.TIMEUP) && (errn != Status.EINTR)) {
- if (errn > Status.APR_OS_START_USERERR) {
- errn -= Status.APR_OS_START_USERERR;
- }
- log.error(sm.getString("endpoint.poll.fail",
"" + errn, Error.strerror(errn)));
- // Handle poll critical failure
- synchronized (this) {
- destroy();
- init();
- }
- continue;
- }
}
+
+ // Process socket timeouts
if (soTimeout > 0 && maintainTime > 1000000L &&
running) {
maintainTime = 0;
long date = System.currentTimeMillis();
long socket = timeouts.check(date);
while (socket != 0) {
- // FIXME: what is the status code if the socket is not
present in that poller
- rv = Poll.remove(pollers[0], socket);
+ removeFromPoller(socket);
if (comet) {
processSocket(socket, SocketStatus.TIMEOUT);
} else {
@@ -1645,6 +1679,7 @@
socket = timeouts.check(date);
}
}
+
} catch (Throwable t) {
log.error(sm.getString("endpoint.poll.error"), t);
}