Author: remy.maucherat(a)jboss.com
Date: 2007-10-01 21:14:17 -0400 (Mon, 01 Oct 2007)
New Revision: 288
Removed:
trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java
Modified:
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
Log:
- Remove BaseEndpoint (not used).
- Add concurrency checks for write and read. It should be
enough but I'm not certain at this point.
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-10-01 22:41:10 UTC
(rev 287)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2007-10-02 01:14:17 UTC
(rev 288)
@@ -326,12 +326,29 @@
protected int cometTimeout = -1;
protected boolean readNotifications = true;
+ protected boolean writeNotification = false;
protected boolean nonBlocking = true;
+ protected boolean cometProcessing = false;
// ------------------------------------------------------------- Properties
+ public void startProcessing() {
+ cometProcessing = true;
+ }
+
+
+ public void endProcessing() {
+ cometProcessing = false;
+ }
+
+
+ public boolean getWriteNotification() {
+ return writeNotification;
+ }
+
+
public boolean getReadNotifications() {
return readNotifications;
}
@@ -762,6 +779,8 @@
try {
// If processing a write event, must flush any leftover bytes first
if (status == SocketStatus.OPEN_WRITE) {
+ // The write notification is now done
+ writeNotification = false;
// FIXME: If the flush does not manage to flush all leftover bytes, it is
possible
// that the servlet is not going to be able to write bytes. This will be
handled properly,
// but is wasteful.
@@ -1250,9 +1269,13 @@
readNotifications = true;
endpoint.getCometPoller().add(socket, timeout, false, false, true);
} else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
- // FIXME: Maybe, should check (?)
- // FIXME: If called synchronously, setting a flag instead of a direct call is
needed.
- endpoint.getCometPoller().add(socket, timeout, false, true, false);
+ // An event is being processed already:adding for write will be done
+ // when the socket gets back to the poller
+ if (cometProcessing) {
+ writeNotification = true;
+ } else {
+ endpoint.getCometPoller().add(socket, timeout, false, true, false);
+ }
} else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
cometTimeout = ((Integer) param).intValue();
}
Modified: trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2007-10-01 22:41:10 UTC
(rev 287)
+++ trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2007-10-02 01:14:17 UTC
(rev 288)
@@ -514,6 +514,7 @@
SocketState state = SocketState.CLOSED;
if (result != null) {
+ result.startProcessing();
// Call the appropriate event
try {
state = result.event(status);
@@ -546,8 +547,9 @@
}
} else {
proto.endpoint.getCometPoller().add(socket,
result.getCometTimeout(),
- result.getReadNotifications(), false, false);
+ result.getReadNotifications(),
result.getWriteNotification(), false);
}
+ result.endProcessing();
}
}
return state;
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-10-01 22:41:10 UTC (rev
287)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2007-10-02 01:14:17 UTC (rev
288)
@@ -342,8 +342,6 @@
*/
protected Poller[] pollers = null;
protected int pollerRoundRobin = 0;
- // FIXME: due to Comet and the socket state, getPoller should accept the socket as an
argument,
- // or (better) a getPoller(long socket) should be added for cases that need it
public Poller getPoller() {
pollerRoundRobin = (pollerRoundRobin + 1) % pollers.length;
return pollers[pollerRoundRobin];
@@ -355,6 +353,8 @@
*/
protected Poller[] cometPollers = null;
protected int cometPollerRoundRobin = 0;
+ // FIXME: due to Comet and the socket state, getPoller should accept the socket as an
argument,
+ // or (better) a getPoller(long socket) should be added for cases that need it
public Poller getCometPoller() {
cometPollerRoundRobin = (cometPollerRoundRobin + 1) % cometPollers.length;
return cometPollers[cometPollerRoundRobin];
@@ -1589,12 +1589,6 @@
SocketInfo info = localAddList.get();
while (info != null) {
if (info.read || info.write) {
- // FIXME: Check concurrency to see if the socket
isn't being processed for an
- // event (such as a read if a write is added): need to
add a "processing event"
- // concurrent map ...
- if (comet) {
-
- }
// Store timeout
timeouts.add(info.socket, System.currentTimeMillis() +
info.timeout);
if (comet) {
@@ -1653,7 +1647,6 @@
for (int n = 0; n < rv; n++) {
timeouts.remove(desc[n*2+1]);
// Check for failed sockets and hand this socket off to a
worker
- // FIXME: Need to check for a write
if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)
|| (comet &&
@@ -1671,7 +1664,7 @@
}
} else if (rv < 0) {
int errn = -rv;
- /* Any non timeup or interrupted error is critical */
+ // Any non timeup or interrupted error is critical
if ((errn != Status.TIMEUP) && (errn !=
Status.EINTR)) {
if (errn > Status.APR_OS_START_USERERR) {
errn -= Status.APR_OS_START_USERERR;
@@ -1689,6 +1682,8 @@
// Process socket timeouts
if (soTimeout > 0 && maintainTime > 1000000L &&
running) {
+ // This works and uses only one timeout mechanism for everything,
but the
+ // non Comet poller might be a bit faster by using the old
maintain.
maintainTime = 0;
long date = System.currentTimeMillis();
long socket = timeouts.check(date);
Deleted: trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java 2007-10-01 22:41:10 UTC (rev
287)
+++ trunk/java/org/apache/tomcat/util/net/BaseEndpoint.java 2007-10-02 01:14:17 UTC (rev
288)
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package
org.apache.tomcat.util.net;
-
-import java.net.InetAddress;
-import java.util.concurrent.Executor;
-
-import org.apache.tomcat.util.res.StringManager;
-import org.jboss.logging.Logger;
-import org.jboss.logging.Logger;
-
-/**
- * APR tailored thread pool, providing the following services:
- * <ul>
- * <li>Socket acceptor thread</li>
- * <li>Socket poller thread</li>
- * <li>Sendfile thread</li>
- * <li>Worker threads pool</li>
- * </ul>
- *
- * When switching to Java 5, there's an opportunity to use the virtual
- * machine's thread pool.
- *
- * @author Mladen Turk
- * @author Remy Maucherat
- */
-public abstract class BaseEndpoint {
-
-
- // -------------------------------------------------------------- Constants
-
-
- protected static Logger log = Logger.getLogger(BaseEndpoint.class);
-
- protected static StringManager sm =
- StringManager.getManager("org.apache.tomcat.util.net.res");
-
-
- /**
- * The Request attribute key for the cipher suite.
- */
- public static final String CIPHER_SUITE_KEY =
"javax.servlet.request.cipher_suite";
-
- /**
- * The Request attribute key for the key size.
- */
- public static final String KEY_SIZE_KEY =
"javax.servlet.request.key_size";
-
- /**
- * The Request attribute key for the client certificate chain.
- */
- public static final String CERTIFICATE_KEY =
"javax.servlet.request.X509Certificate";
-
- /**
- * The Request attribute key for the session id.
- * This one is a Tomcat extension to the Servlet spec.
- */
- public static final String SESSION_ID_KEY =
"javax.servlet.request.ssl_session";
-
-
- // ----------------------------------------------------------------- Fields
-
-
- /**
- * Running state of the endpoint.
- */
- protected volatile boolean running = false;
-
-
- /**
- * Will be set to true whenever the endpoint is paused.
- */
- protected volatile boolean paused = false;
-
-
- /**
- * Track the initialization state of the endpoint.
- */
- protected boolean initialized = false;
-
-
- /**
- * Current worker threads busy count.
- */
- protected int curThreadsBusy = 0;
-
-
- /**
- * Current worker threads count.
- */
- protected int curThreads = 0;
-
-
- /**
- * Sequence number used to generate thread names.
- */
- protected int sequence = 0;
-
-
- // ------------------------------------------------------------- Properties
-
-
- /**
- * External Executor based thread pool.
- */
- protected Executor executor = null;
- public void setExecutor(Executor executor) { this.executor = executor; }
- public Executor getExecutor() { return executor; }
-
-
- /**
- * Maximum amount of worker threads.
- */
- protected int maxThreads = 40;
- public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
- public int getMaxThreads() { return maxThreads; }
-
-
- /**
- * Priority of the acceptor and poller threads.
- */
- protected int threadPriority = Thread.NORM_PRIORITY;
- public void setThreadPriority(int threadPriority) { this.threadPriority =
threadPriority; }
- public int getThreadPriority() { return threadPriority; }
-
-
- /**
- * Server socket port.
- */
- protected int port;
- public int getPort() { return port; }
- public void setPort(int port ) { this.port=port; }
-
-
- /**
- * Address for the server socket.
- */
- protected InetAddress address;
- public InetAddress getAddress() { return address; }
- public void setAddress(InetAddress address) { this.address = address; }
-
-
- /**
- * Allows the server developer to specify the backlog that
- * should be used for server sockets. By default, this value
- * is 100.
- */
- protected int backlog = 100;
- public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
- public int getBacklog() { return backlog; }
-
-
- /**
- * Socket TCP no delay.
- */
- protected boolean tcpNoDelay = false;
- public boolean getTcpNoDelay() { return tcpNoDelay; }
- public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
-
-
- /**
- * Socket linger.
- */
- protected int soLinger = 100;
- public int getSoLinger() { return soLinger; }
- public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
-
-
- /**
- * Socket timeout.
- */
- protected int soTimeout = -1;
- public int getSoTimeout() { return soTimeout; }
- public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
-
-
- /**
- * The default is true - the created threads will be
- * in daemon mode. If set to false, the control thread
- * will not be daemon - and will keep the process alive.
- */
- protected boolean daemon = true;
- public void setDaemon(boolean b) { daemon = b; }
- public boolean getDaemon() { return daemon; }
-
-
- /**
- * Name of the thread pool, which will be used for naming child threads.
- */
- protected String name = "TP";
- public void setName(String name) { this.name = name; }
- public String getName() { return name; }
-
-
- /**
- * Dummy maxSpareThreads property.
- */
- public int getMaxSpareThreads() { return 0; }
-
-
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return 0; }
-
-
- // --------------------------------------------------------- Public Methods
-
-
- /**
- * Return the amount of threads that are managed by the pool.
- *
- * @return the amount of threads that are managed by the pool
- */
- public int getCurrentThreadCount() {
- return curThreads;
- }
-
-
- /**
- * Return the amount of threads currently busy.
- *
- * @return the amount of threads currently busy
- */
- public int getCurrentThreadsBusy() {
- return curThreadsBusy;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is running, false otherwise
- */
- public boolean isRunning() {
- return running;
- }
-
-
- /**
- * Return the state of the endpoint.
- *
- * @return true if the endpoint is paused, false otherwise
- */
- public boolean isPaused() {
- return paused;
- }
-
-
- // ----------------------------------------------- Public Lifecycle Methods
-
-
- /**
- * Initialize the endpoint.
- */
- public abstract void init()
- throws Exception;
-
-
- /**
- * Start the APR endpoint, creating acceptor, poller and sendfile threads.
- */
- public abstract void start()
- throws Exception;
-
-
- /**
- * Pause the endpoint, which will make it stop accepting new sockets.
- */
- public void pause() {
- if (running && !paused) {
- paused = true;
- unlockAccept();
- }
- }
-
-
- /**
- * Resume the endpoint, which will make it start accepting new sockets
- * again.
- */
- public void resume() {
- if (running) {
- paused = false;
- }
- }
-
-
- /**
- * Stop the endpoint. This will cause all processing threads to stop.
- */
- public abstract void stop();
-
-
- /**
- * Deallocate APR memory pools, and close server socket.
- */
- public abstract void destroy() throws Exception;
-
-
- // ------------------------------------------------------ Protected Methods
-
-
- /**
- * Get a sequence number used for thread naming.
- */
- protected int getSequence() {
- return sequence++;
- }
-
-
- /**
- * Unlock the server socket accept using a bugus connection.
- */
- protected void unlockAccept() {
- java.net.Socket s = null;
- try {
- // Need to create a connection to unlock the accept();
- if (address == null) {
- s = new java.net.Socket("127.0.0.1", port);
- } else {
- s = new java.net.Socket(address, port);
- // setting soLinger to a small value will help shutdown the
- // connection quicker
- s.setSoLinger(true, 0);
- }
- } catch(Exception e) {
- if (log.isDebugEnabled()) {
- log.debug(sm.getString("endpoint.debug.unlock", "" +
port), e);
- }
- } finally {
- if (s != null) {
- try {
- s.close();
- } catch (Exception e) {
- // Ignore
- }
- }
- }
- }
-
-
-}