JBossWeb SVN: r844 - in trunk/java/org/apache: tomcat/util/net and 1 other directory.
by jbossweb-commits@lists.jboss.org
Author: remy.maucherat(a)jboss.com
Date: 2008-11-03 17:01:48 -0500 (Mon, 03 Nov 2008)
New Revision: 844
Modified:
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Add an extra flag, otherwise async resume() causes problems no matter what I tried.
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2008-11-03 13:47:06 UTC (rev 843)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2008-11-03 22:01:48 UTC (rev 844)
@@ -1274,14 +1274,14 @@
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
if (!cometProcessing && !resumeNotification) {
- endpoint.getCometPoller().add(socket, cometTimeout, false, false, true);
+ endpoint.getCometPoller().add(socket, cometTimeout, false, false, true, true);
}
resumeNotification = true;
} else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
// An event is being processed already: adding for write will be done
// when the socket gets back to the poller
if (!cometProcessing && !writeNotification) {
- endpoint.getCometPoller().add(socket, cometTimeout, false, true, false);
+ endpoint.getCometPoller().add(socket, cometTimeout, false, true, false, true);
}
writeNotification = true;
} else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
Modified: trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2008-11-03 13:47:06 UTC (rev 843)
+++ trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2008-11-03 22:01:48 UTC (rev 844)
@@ -572,7 +572,7 @@
} else {
if (proto.endpoint.isRunning()) {
proto.endpoint.getCometPoller().add(socket, result.getCometTimeout(),
- result.getReadNotifications(), result.getWriteNotification(), result.getResumeNotification());
+ result.getReadNotifications(), result.getWriteNotification(), result.getResumeNotification(), false);
}
}
result.endProcessing();
@@ -603,7 +603,7 @@
state = event(socket, SocketStatus.OPEN_READ);
} else {
proto.endpoint.getCometPoller().add(socket, processor.getCometTimeout(),
- processor.getReadNotifications(), false, false);
+ processor.getReadNotifications(), false, false, false);
}
} else {
recycledProcessors.offer(processor);
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2008-11-03 13:47:06 UTC (rev 843)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2008-11-03 22:01:48 UTC (rev 844)
@@ -1163,6 +1163,7 @@
public static final int READ = 1;
public static final int WRITE = 2;
public static final int RESUME = 4;
+ public static final int WAKEUP = 8;
public long socket;
public int timeout;
public int flags;
@@ -1175,6 +1176,9 @@
public boolean resume() {
return (flags & RESUME) == RESUME;
}
+ public boolean wakeup() {
+ return (flags & WAKEUP) == WAKEUP;
+ }
}
@@ -1284,6 +1288,12 @@
if (size == sockets.length) {
return false;
} else {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ flags[i] = flags[i] | flag;
+ return true;
+ }
+ }
sockets[size] = socket;
timeouts[size] = timeout;
flags[size] = flag;
@@ -1520,7 +1530,7 @@
* @param write to do write polling
* @param resume to send a callback event
*/
- public void add(long socket, int timeout, boolean read, boolean write, boolean resume) {
+ public void add(long socket, int timeout, boolean read, boolean write, boolean resume, boolean wakeup) {
if (timeout < 0) {
timeout = soTimeout;
}
@@ -1529,7 +1539,8 @@
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
if (addList.add(socket, timeout,
- (read ? SocketInfo.READ : 0) | (write ? SocketInfo.WRITE : 0) | (resume ? SocketInfo.RESUME : 0))) {
+ (read ? SocketInfo.READ : 0) | (write ? SocketInfo.WRITE : 0) | (resume ? SocketInfo.RESUME : 0)
+ | (wakeup ? SocketInfo.WAKEUP : 0))) {
ok = true;
this.notify();
}
@@ -1664,7 +1675,28 @@
}
SocketInfo info = localAddList.get();
while (info != null) {
- if (info.read() || info.write()) {
+ if (info.wakeup()) {
+ // Resume event if socket is present in the poller
+ if (timeouts.remove(info.socket)) {
+ removeFromPoller(info.socket);
+ if (info.resume()) {
+ if (!processSocket(info.socket, SocketStatus.OPEN_CALLBACK)) {
+ Socket.destroy(info.socket);
+ }
+ } else {
+ int events = ((info.read()) ? Poll.APR_POLLIN : 0)
+ | ((info.write()) ? Poll.APR_POLLOUT : 0);
+ if (!addToPoller(info.socket, events)) {
+ // Can't do anything: close the socket right away
+ if (!comet || (comet && !processSocket(info.socket, SocketStatus.ERROR))) {
+ Socket.destroy(info.socket);
+ }
+ } else {
+ timeouts.add(info.socket, System.currentTimeMillis() + info.timeout);
+ }
+ }
+ }
+ } else if (info.read() || info.write()) {
if (info.resume()) {
// Resume event
timeouts.remove(info.socket);
@@ -1674,7 +1706,6 @@
}
} else {
// Store timeout
- timeouts.add(info.socket, System.currentTimeMillis() + info.timeout);
if (comet) {
removeFromPoller(info.socket);
}
@@ -1682,10 +1713,11 @@
| ((info.write()) ? Poll.APR_POLLOUT : 0);
if (!addToPoller(info.socket, events)) {
// Can't do anything: close the socket right away
- timeouts.remove(info.socket);
if (!comet || (comet && !processSocket(info.socket, SocketStatus.ERROR))) {
Socket.destroy(info.socket);
}
+ } else {
+ timeouts.add(info.socket, System.currentTimeMillis() + info.timeout);
}
}
} else {
@@ -1797,6 +1829,10 @@
// Reallocate the current poller
int count = Poll.pollset(pollers[i], desc);
long newPoller = allocatePoller(actualPollerSize, pool, -1);
+ // FIXME: don't restore connections for now, since I have not tested it
+ pollerSpace[i] = actualPollerSize;
+ connectionCount -= count;
+ /*
for (int j = 0; j < count; j++) {
int events = (int) desc[2*j];
long socket = desc[2*j+1];
@@ -1807,7 +1843,7 @@
if (Poll.add(newPoller, socket, events) != Status.APR_SUCCESS) {
// Skip
}
- }
+ }*/
Poll.destroy(pollers[i]);
pollers[i] = newPoller;
}