Author: remy.maucherat(a)jboss.com
Date: 2007-07-23 20:18:10 -0400 (Mon, 23 Jul 2007)
New Revision: 205
Modified:
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Working draft for new Comet implementation.
- Add write polling.
- Per connection timeout value, and stop using Poll.maintain.
- New multiple poller algorithm, which may or may not be faster, but is needed when using
Comet.
- More work to do for Windows (where multiple pollers must be used). In particular status
codes when
removing are needed.
- New structures for per socket timeouts, socket add list.
- The HTTP connector itself is missing the setTimeout calls for switching to non blocking
and back.
- Not tested at all.
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-07-19 16:02:38 UTC (rev
204)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-07-24 00:18:10 UTC (rev
205)
@@ -343,6 +343,8 @@
*/
protected Poller[] pollers = null;
protected int pollerRoundRobin = 0;
+ // FIXME: due to Comet and the socket state, getPoller should accept the socket as an
argument,
+ // or (better) a getPoller(long socket) should be added for cases that need it
public Poller getPoller() {
pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
return pollers[pollerRoundRobin];
@@ -632,6 +634,9 @@
acceptorThreadCount = 1;
}
if (pollerThreadCount == 0) {
+ // FIXME: Default to one per CPU ?
+ pollerThreadCount = 1;
+ /*
if ((OS.IS_WIN32 || OS.IS_WIN64) && (pollerSize > 1024)) {
// The maximum per poller to get reasonable performance is 1024
pollerThreadCount = pollerSize / 1024;
@@ -639,10 +644,14 @@
pollerSize = pollerSize - (pollerSize % 1024);
} else {
// No explicit poller size limitation
+ // FIXME: Default to one per CPU ?
pollerThreadCount = 1;
- }
+ }*/
}
if (sendfileThreadCount == 0) {
+ // FIXME: Default to one per CPU ?
+ sendfileThreadCount = 1;
+ /*
if ((OS.IS_WIN32 || OS.IS_WIN64) && (sendfileSize > 1024)) {
// The maximum per poller to get reasonable performance is 1024
sendfileThreadCount = sendfileSize / 1024;
@@ -652,7 +661,7 @@
// No explicit poller size limitation
// FIXME: Default to one per CPU ?
sendfileThreadCount = 1;
- }
+ }*/
}
// Delay accepting of new connections until data is available
@@ -725,14 +734,6 @@
workers = new WorkerStack(maxThreads);
}
- // Start acceptor threads
- for (int i = 0; i < acceptorThreadCount; i++) {
- Thread acceptorThread = new Thread(new Acceptor(), getName() +
"-Acceptor-" + i);
- acceptorThread.setPriority(threadPriority);
- acceptorThread.setDaemon(daemon);
- acceptorThread.start();
- }
-
// Start poller threads
pollers = new Poller[pollerThreadCount];
for (int i = 0; i < pollerThreadCount; i++) {
@@ -767,6 +768,15 @@
sendfileThread.start();
}
}
+
+ // Start acceptor threads
+ for (int i = 0; i < acceptorThreadCount; i++) {
+ Thread acceptorThread = new Thread(new Acceptor(), getName() +
"-Acceptor-" + i);
+ acceptorThread.setPriority(threadPriority);
+ acceptorThread.setDaemon(daemon);
+ acceptorThread.start();
+ }
+
}
}
@@ -1125,6 +1135,151 @@
}
+ // ------------------------------------------------- SocketInfo Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public class SocketInfo {
+ public long socket;
+ public int timeout;
+ public boolean read;
+ public boolean write;
+
+ }
+
+
+ // --------------------------------------------- SocketTimeouts Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public class SocketTimeouts {
+ protected int size;
+
+ protected long[] sockets;
+ protected long[] timeouts;
+ protected int pos = 0;
+
+ public SocketTimeouts(int size) {
+ this.size = 0;
+ sockets = new long[size];
+ timeouts = new long[size];
+ }
+
+ public void add(long socket, long timeout) {
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ size++;
+ }
+
+ public void remove(long socket) {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ sockets[i] = sockets[size - 1];
+ timeouts[i] = timeouts[size - 1];
+ size--;
+ break;
+ }
+ }
+ }
+
+ public long check(long date) {
+ while (pos < size) {
+ if (date > timeouts[pos]) {
+ long result = sockets[pos];
+ sockets[pos] = sockets[size - 1];
+ timeouts[pos] = timeouts[size - 1];
+ size--;
+ return result;
+ }
+ pos++;
+ }
+ pos = 0;
+ return 0;
+ }
+
+ }
+
+
+ // ------------------------------------------------- SocketList Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public class SocketList {
+ protected int size;
+ protected int pos;
+
+ protected long[] sockets;
+ protected int[] timeouts;
+ protected boolean[] reads;
+ protected boolean[] writes;
+
+ protected SocketInfo info = new SocketInfo();
+
+ public SocketList(int size) {
+ size = 0;
+ pos = 0;
+ sockets = new long[size];
+ timeouts = new int[size];
+ reads = new boolean[size];
+ writes = new boolean[size];
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public SocketInfo get() {
+ if (pos == size) {
+ return null;
+ } else {
+ info.socket = sockets[pos];
+ info.timeout = timeouts[pos];
+ info.read = reads[pos];
+ info.write = writes[pos];
+ pos++;
+ return info;
+ }
+ }
+
+ public void clear() {
+ size = 0;
+ pos = 0;
+ }
+
+ public boolean add(long socket, int timeout, boolean read, boolean write) {
+ if (size == sockets.length) {
+ return false;
+ } else {
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ reads[size] = read;
+ writes[size] = write;
+ size++;
+ return true;
+ }
+ }
+
+ public void duplicate(SocketList copy) {
+ copy.size = size;
+ copy.pos = pos;
+ System.arraycopy(sockets, 0, copy.sockets, 0, size);
+ System.arraycopy(timeouts, 0, copy.timeouts, 0, size);
+ System.arraycopy(reads, 0, copy.reads, 0, size);
+ System.arraycopy(writes, 0, copy.writes, 0, size);
+ }
+
+ }
+
+
// ----------------------------------------------------- Poller Inner Class
@@ -1133,15 +1288,59 @@
*/
public class Poller implements Runnable {
- protected long serverPollset = 0;
+ /**
+ * Pointers to the pollers.
+ */
+ protected long[] pollers = null;
+
+ /**
+ * Amount of spots left in the poller.
+ */
+ protected int[] pollerSpace = null;
+
+ /**
+ * Amount of low level pollers in use by this poller.
+ */
+ protected int pollerCount;
+
+ /**
+ * Timeout value for the poll call.
+ */
+ protected int pollerTime;
+
+ /**
+ * Root pool.
+ */
protected long pool = 0;
+
+ /**
+ * Socket descriptors.
+ */
protected long[] desc;
- protected long[] addS;
- protected int addCount = 0;
-
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList addList = null;
+
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList localAddList = null;
+
+ /**
+ * Comet mode flag.
+ */
protected boolean comet = true;
+ /**
+ * Structure used for storing timeouts.
+ */
+ protected SocketTimeouts timeouts = null;
+
+ /**
+ * Amount of kept alive connections inside this poller.
+ */
protected int keepAliveCount = 0;
public int getKeepAliveCount() { return keepAliveCount; }
@@ -1154,33 +1353,59 @@
* be 62 (recompiling APR is necessary to remove this limitation).
*/
protected void init() {
+
+ timeouts = new SocketTimeouts(pollerSize);
+
pool = Pool.create(serverSockPool);
int size = pollerSize / pollerThreadCount;
+ if ((OS.IS_WIN32 || OS.IS_WIN64) && (size > 1024)) {
+ // The maximum per poller to get reasonable performance is 1024
+ // Adjust poller size so that it won't reach the limit
+ size = 1024;
+ }
int timeout = keepAliveTimeout;
if (timeout < 0) {
timeout = soTimeout;
}
- serverPollset = allocatePoller(size, pool, timeout);
- if (serverPollset == 0 && size > 1024) {
+
+ // FIXME: timeout is useless, look into removing it
+ long pollset = allocatePoller(size, pool, timeout);
+ if (pollset == 0 && size > 1024) {
size = 1024;
- serverPollset = allocatePoller(size, pool, timeout);
+ pollset = allocatePoller(size, pool, timeout);
}
- if (serverPollset == 0) {
+ if (pollset == 0) {
size = 62;
- serverPollset = allocatePoller(size, pool, timeout);
+ pollset = allocatePoller(size, pool, timeout);
}
+
+ pollerCount = (pollerSize / pollerThreadCount) / size;
+ pollerTime = pollTime / pollerCount;
+
+ pollers = new long[pollerCount];
+ pollers[0] = pollset;
+ for (int i = 1; i < pollerCount; i++) {
+ pollers[i] = allocatePoller(size, pool, timeout);
+ }
+
+ pollerSpace = new int[pollerCount];
+ for (int i = 0; i < pollerCount; i++) {
+ pollerSpace[i] = size;
+ }
+
desc = new long[size * 2];
keepAliveCount = 0;
- addS = new long[size];
- addCount = 0;
+ addList = new SocketList(pollerSize / pollerThreadCount);
+ localAddList = new SocketList(pollerSize / pollerThreadCount);
+
}
/**
* Destroy the poller.
*/
protected void destroy() {
- // Wait for polltime before doing anything, so that the poller threads
- // exit, otherwise parallel descturction of sockets which are still
+ // Wait for pollerTime before doing anything, so that the poller threads
+ // exit, otherwise parallel destruction of sockets which are still
// in the poller can cause problems
try {
synchronized (this) {
@@ -1190,27 +1415,31 @@
// Ignore
}
// Close all sockets in the add queue
- for (int i = 0; i < addCount; i++) {
+ SocketInfo info = addList.get();
+ while (info != null) {
if (comet) {
- processSocket(addS[i], SocketStatus.STOP);
+ processSocket(info.socket, SocketStatus.STOP);
} else {
- Socket.destroy(addS[i]);
+ Socket.destroy(info.socket);
}
+ info = addList.get();
}
+ addList.clear();
// Close all sockets still in the poller
- int rv = Poll.pollset(serverPollset, desc);
- if (rv > 0) {
- for (int n = 0; n < rv; n++) {
- if (comet) {
- processSocket(desc[n*2+1], SocketStatus.STOP);
- } else {
- Socket.destroy(desc[n*2+1]);
+ for (int i = 0; i < pollerCount; i++) {
+ int rv = Poll.pollset(pollers[i], desc);
+ if (rv > 0) {
+ for (int n = 0; n < rv; n++) {
+ if (comet) {
+ processSocket(desc[n*2+1], SocketStatus.STOP);
+ } else {
+ Socket.destroy(desc[n*2+1]);
+ }
}
}
}
Pool.destroy(pool);
keepAliveCount = 0;
- addCount = 0;
}
/**
@@ -1222,10 +1451,14 @@
* @param socket to add to the poller
*/
public void add(long socket) {
+ int timeout = keepAliveTimeout;
+ if (timeout < 0) {
+ timeout = soTimeout;
+ }
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addCount >= addS.length) {
+ if (!addList.add(socket, timeout, true, false)) {
// Can't do anything: close the socket right away
if (comet) {
processSocket(socket, SocketStatus.ERROR);
@@ -1234,8 +1467,6 @@
}
return;
}
- addS[addCount] = socket;
- addCount++;
this.notify();
}
}
@@ -1256,12 +1487,10 @@
* @param write to do write polling
*/
public void add(long socket, int timeout, boolean read, boolean write) {
- // FIXME: Implement. At the moment, no support for both read and write
polling, although
- // there's no problem in theory.
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addCount >= addS.length) {
+ if (!addList.add(socket, timeout, read, write)) {
// Can't do anything: close the socket right away
if (comet) {
processSocket(socket, SocketStatus.ERROR);
@@ -1270,8 +1499,6 @@
}
return;
}
- addS[addCount] = socket;
- addCount++;
this.notify();
}
}
@@ -1294,7 +1521,7 @@
}
}
- while (keepAliveCount < 1 && addCount < 1) {
+ while (keepAliveCount < 1 && addList.size() < 1) {
// Reset maintain time.
maintainTime = 0;
try {
@@ -1308,8 +1535,9 @@
try {
// Add sockets which are waiting to the poller
- if (addCount > 0) {
+ if (addList.size() > 0) {
synchronized (this) {
+ /*
for (int i = (addCount - 1); i >= 0; i--) {
int rv = Poll.add
(serverPollset, addS[i], Poll.APR_POLLIN);
@@ -1323,19 +1551,56 @@
Socket.destroy(addS[i]);
}
}
+ }*/
+ // Duplicate to another list, so that the syncing is minimal
+ addList.duplicate(localAddList);
+ addList.clear();
+ }
+ SocketInfo info = localAddList.get();
+ while (info != null) {
+ if (info.read || info.write) {
+ // Store timeout
+ // FIXME: remove sockets from structure when processing
them
+ timeouts.add(info.socket, System.currentTimeMillis() +
info.timeout);
+ // FIXME: what is the status code if the socket is not
present in that poller
+ int rv = Poll.remove(pollers[0], info.socket);
+ // Windows only: check status code and loop over the
other pollers
+ int events = 0;
+ if (info.read) {
+ events = Poll.APR_POLLIN;
+ }
+ if (info.write) {
+ events = events | Poll.APR_POLLOUT;
+ }
+ // FIXME: need to iterate over the pollers array, iterate
over pollers until one is found
+ rv = Poll.add(pollers[0], info.socket, events);
+ // FIXME: check return value
+ } else {
+ // Resume event
+ timeouts.remove(info.socket);
+ // FIXME: what is the status code if the socket is not
present in that poller
+ int rv = Poll.remove(pollers[0], info.socket);
+ // Windows only: check status code and loop over the
other pollers
+ if (comet) {
+ processSocket(info.socket,
SocketStatus.OPEN_CALLBACK);
+ } else {
+ // Should never happen
+ // FIXME: ISE ?
+ Socket.destroy(info.socket);
+ }
}
- addCount = 0;
+ info = localAddList.get();
}
}
maintainTime += pollTime;
// Pool for the specified interval
- int rv = Poll.poll(serverPollset, pollTime, desc, true);
+ // FIXME: need to iterate over the pollers array
+ int rv = Poll.poll(pollers[0], pollerTime, desc, true);
if (rv > 0) {
keepAliveCount -= rv;
for (int n = 0; n < rv; n++) {
// Check for failed sockets and hand this socket off to a
worker
- System.out.println("Desc: " + desc[n*2]);
if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
|| (comet && (!processSocket(desc[n*2+1],
SocketStatus.OPEN_READ)))
@@ -1366,18 +1631,18 @@
}
}
if (soTimeout > 0 && maintainTime > 1000000L &&
running) {
- rv = Poll.maintain(serverPollset, desc, true);
maintainTime = 0;
- if (rv > 0) {
- keepAliveCount -= rv;
- for (int n = 0; n < rv; n++) {
- // Close socket and clear pool
- if (comet) {
- processSocket(desc[n], SocketStatus.TIMEOUT);
- } else {
- Socket.destroy(desc[n]);
- }
+ long date = System.currentTimeMillis();
+ long socket = timeouts.check(date);
+ while (socket != 0) {
+ // FIXME: what is the status code if the socket is not
present in that poller
+ rv = Poll.remove(pollers[0], socket);
+ if (comet) {
+ processSocket(socket, SocketStatus.TIMEOUT);
+ } else {
+ Socket.destroy(socket);
}
+ socket = timeouts.check(date);
}
}
} catch (Throwable t) {