Author: remy.maucherat(a)jboss.com
Date: 2009-03-20 13:26:19 -0400 (Fri, 20 Mar 2009)
New Revision: 966
Added:
trunk/java/org/apache/catalina/valves/EventOrAsyncConnectionManagerValve.java
Modified:
trunk/java/org/apache/catalina/Valve.java
trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
trunk/java/org/apache/catalina/connector/HttpEventImpl.java
trunk/java/org/apache/catalina/connector/InputBuffer.java
trunk/java/org/apache/catalina/connector/OutputBuffer.java
trunk/java/org/apache/catalina/connector/Request.java
trunk/java/org/apache/catalina/connector/Response.java
trunk/java/org/apache/catalina/core/ApplicationFilterFactory.java
trunk/java/org/apache/catalina/core/StandardWrapperValve.java
trunk/java/org/apache/catalina/valves/CometConnectionManagerValve.java
trunk/java/org/apache/coyote/ActionCode.java
trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
trunk/java/org/apache/coyote/ajp/AjpProcessor.java
trunk/java/org/apache/coyote/ajp/AjpProtocol.java
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
trunk/java/org/apache/coyote/http11/Http11Processor.java
trunk/java/org/apache/coyote/http11/Http11Protocol.java
trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
trunk/webapps/docs/changelog.xml
Log:
- Rename comet to event (ok, I guess I am bored ;) ).
- Handle the keepalive scenario for the java.io endpoint.
Modified: trunk/java/org/apache/catalina/Valve.java
===================================================================
--- trunk/java/org/apache/catalina/Valve.java 2009-03-19 18:30:09 UTC (rev 965)
+++ trunk/java/org/apache/catalina/Valve.java 2009-03-20 17:26:19 UTC (rev 966)
@@ -133,7 +133,7 @@
/**
- * Process a Comet event.
+ * Process an event.
*
* @param request The servlet request to be processed
* @param response The servlet response to be created
Modified: trunk/java/org/apache/catalina/connector/CoyoteAdapter.java
===================================================================
--- trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/catalina/connector/CoyoteAdapter.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -148,12 +148,12 @@
// The response IO has been closed asynchronously, so call end
// in most cases
request.getEvent().setType(HttpEvent.EventType.END);
- request.setComet(false);
+ request.setEventMode(false);
close = true;
}
switch (status) {
case OPEN_READ:
- if (!request.isComet()) {
+ if (!request.isEventMode()) {
// The event has been closed asynchronously, so call end instead
of
// read to cleanup the pipeline
request.getEvent().setType(HttpEvent.EventType.END);
@@ -184,7 +184,7 @@
}
break;
case OPEN_WRITE:
- if (!request.isComet()) {
+ if (!request.isEventMode()) {
// The event has been closed asynchronously, so call end instead
of
// read to cleanup the pipeline
request.getEvent().setType(HttpEvent.EventType.END);
@@ -194,7 +194,7 @@
}
break;
case OPEN_CALLBACK:
- if (!request.isComet()) {
+ if (!request.isEventMode()) {
// The event has been closed asynchronously, so call end instead
of
// read to cleanup the pipeline
// In nearly all cases, the close does a resume which will end
up
@@ -215,7 +215,7 @@
close = true;
break;
case TIMEOUT:
- if (!request.isComet()) {
+ if (!request.isEventMode()) {
// The event has been closed asynchronously, so call end instead
of
// read to cleanup the pipeline
request.getEvent().setType(HttpEvent.EventType.END);
@@ -272,7 +272,7 @@
request.recycle();
request.setFilterChain(null);
response.recycle();
- res.action(ActionCode.ACTION_COMET_END, null);
+ res.action(ActionCode.ACTION_EVENT_END, null);
}
}
@@ -318,7 +318,7 @@
response.addHeader("X-Powered-By", "Servlet/3.0");
}
- boolean comet = false;
+ boolean event = false;
try {
// Parse and set Catalina and configuration specific
@@ -329,20 +329,20 @@
// Calling the container
connector.getContainer().getPipeline().getFirst().invoke(request,
response);
- if (request.isComet()) {
+ if (request.isEventMode()) {
if (!response.isClosed() && !response.isError()) {
- res.action(ActionCode.ACTION_COMET_BEGIN, null);
- comet = true;
+ res.action(ActionCode.ACTION_EVENT_BEGIN, null);
+ event = true;
} else {
// Clear the filter chain, as otherwise it will not be reset
elsewhere
- // since this is a Comet request
+ // since this is an event driven request
request.setFilterChain(null);
}
}
}
- if (!comet) {
+ if (!event) {
response.finishResponse();
req.action(ActionCode.ACTION_POST_REQUEST , null);
}
@@ -354,7 +354,7 @@
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
// Recycle the wrapper request and response
- if (!comet) {
+ if (!event) {
request.recycle();
response.recycle();
} else {
Modified: trunk/java/org/apache/catalina/connector/HttpEventImpl.java
===================================================================
--- trunk/java/org/apache/catalina/connector/HttpEventImpl.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/catalina/connector/HttpEventImpl.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -77,7 +77,7 @@
}
public void close() throws IOException {
- request.setComet(false);
+ request.setEventMode(false);
request.resume();
}
Modified: trunk/java/org/apache/catalina/connector/InputBuffer.java
===================================================================
--- trunk/java/org/apache/catalina/connector/InputBuffer.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/catalina/connector/InputBuffer.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -280,7 +280,7 @@
int available = 0;
if (state != CHAR_STATE) {
available = bb.getLength();
- if (request.isComet() && available == 0) {
+ if (request.isEventMode() && available == 0) {
try {
coyoteRequest.action(ActionCode.ACTION_AVAILABLE, null);
available = realReadBytes(null, 0, 0);
@@ -291,7 +291,7 @@
}
} else {
available = cb.getLength();
- if (request.isComet() && available == 0) {
+ if (request.isEventMode() && available == 0) {
try {
coyoteRequest.action(ActionCode.ACTION_AVAILABLE, null);
available = realReadChars(null, 0, cb.getBuffer().length);
Modified: trunk/java/org/apache/catalina/connector/OutputBuffer.java
===================================================================
--- trunk/java/org/apache/catalina/connector/OutputBuffer.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/catalina/connector/OutputBuffer.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -359,7 +359,7 @@
protected int lastWrite() {
int res = coyoteResponse.getLastWrite();
if (res == 0) {
- coyoteResponse.action(ActionCode.ACTION_COMET_WRITE, null);
+ coyoteResponse.action(ActionCode.ACTION_EVENT_WRITE, null);
}
return res;
}
Modified: trunk/java/org/apache/catalina/connector/Request.java
===================================================================
--- trunk/java/org/apache/catalina/connector/Request.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/catalina/connector/Request.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -199,6 +199,12 @@
/**
+ * Servlet 3.0 asynchronous mode flag
+ */
+ protected boolean asyncMode = false;
+
+
+ /**
* Authentication type.
*/
protected String authType = null;
@@ -211,9 +217,9 @@
/**
- * Comet state
+ * Event mode flag
*/
- protected boolean comet = false;
+ protected boolean eventMode = false;
/**
@@ -393,7 +399,7 @@
dispatcherType = null;
requestDispatcherPath = null;
- comet = false;
+ eventMode = false;
if (event != null) {
event.clear();
event = null;
@@ -458,7 +464,7 @@
/**
- * Clear cached encoders (to save memory for Comet requests).
+ * Clear cached encoders (to save memory for event or async requests).
*/
public void clearEncoders() {
inputBuffer.clearEncoders();
@@ -2264,18 +2270,18 @@
/**
- * Return true if the current request is handling Comet traffic.
+ * Return true if the current request is using event mode.
*/
- public boolean isComet() {
- return comet;
+ public boolean isEventMode() {
+ return eventMode;
}
/**
- * Set comet state.
+ * Set event mode.
*/
- public void setComet(boolean comet) {
- this.comet = comet;
+ public void setEventMode(boolean eventMode) {
+ this.eventMode = eventMode;
}
@@ -2291,17 +2297,17 @@
* Set connection timeout.
*/
public void setTimeout(int timeout) {
- coyoteRequest.action(ActionCode.ACTION_COMET_TIMEOUT, new Integer(timeout));
+ coyoteRequest.action(ActionCode.ACTION_EVENT_TIMEOUT, timeout);
}
public void resume() {
- coyoteRequest.action(ActionCode.ACTION_COMET_RESUME, null);
+ coyoteRequest.action(ActionCode.ACTION_EVENT_RESUME, null);
}
public void suspend() {
- coyoteRequest.action(ActionCode.ACTION_COMET_SUSPEND, null);
+ coyoteRequest.action(ActionCode.ACTION_EVENT_SUSPEND, null);
}
Modified: trunk/java/org/apache/catalina/connector/Response.java
===================================================================
--- trunk/java/org/apache/catalina/connector/Response.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/catalina/connector/Response.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -289,7 +289,7 @@
/**
- * Clear cached encoders (to save memory for Comet requests).
+ * Clear cached encoders (to save memory for event or async requests).
*/
public void clearEncoders() {
outputBuffer.clearEncoders();
Modified: trunk/java/org/apache/catalina/core/ApplicationFilterFactory.java
===================================================================
--- trunk/java/org/apache/catalina/core/ApplicationFilterFactory.java 2009-03-19 18:30:09
UTC (rev 965)
+++ trunk/java/org/apache/catalina/core/ApplicationFilterFactory.java 2009-03-20 17:26:19
UTC (rev 966)
@@ -123,7 +123,7 @@
ApplicationFilterChain filterChain = null;
if (request instanceof Request) {
Request req = (Request) request;
- comet = req.isComet();
+ comet = req.isEventMode();
if (Globals.IS_SECURITY_ENABLED) {
// Security: Do not recycle
filterChain = new ApplicationFilterChain();
Modified: trunk/java/org/apache/catalina/core/StandardWrapperValve.java
===================================================================
--- trunk/java/org/apache/catalina/core/StandardWrapperValve.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/catalina/core/StandardWrapperValve.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -159,11 +159,11 @@
// Identify if the request should be switched to event mode now that
// the servlet has been allocated
- boolean comet = false;
+ boolean event = false;
if (servlet instanceof HttpEventServlet
&& request.getConnector().hasIoEvents()) {
- comet = true;
- request.setComet(true);
+ event = true;
+ request.setEventMode(true);
}
// Acknowledge the request
@@ -197,8 +197,8 @@
ApplicationFilterFactory.getInstance();
ApplicationFilterChain filterChain =
factory.createFilterChain(request, wrapper, servlet);
- // Reset comet flag value after creating the filter chain
- request.setComet(false);
+ // Reset event flag value after creating the filter chain
+ request.setEventMode(false);
// Call the filter chain for this request
// NOTE: This also calls the servlet's service() method
@@ -213,8 +213,8 @@
if (context.getSwallowOutput()) {
try {
SystemLogHandler.startCapture();
- if (comet) {
- request.setComet(true);
+ if (event) {
+ request.setEventMode(true);
request.getSession(true);
filterChain.doFilterEvent(request.getEvent());
} else {
@@ -228,8 +228,8 @@
}
}
} else {
- if (comet) {
- request.setComet(true);
+ if (event) {
+ request.setEventMode(true);
request.getSession(true);
filterChain.doFilterEvent(request.getEvent());
} else {
@@ -289,8 +289,8 @@
// Release the filter chain (if any) for this request
if (filterChain != null) {
- if (request.isComet()) {
- // If this is a Comet request, then the same chain will be used for the
+ if (request.isEventMode()) {
+ // If this is a event request, then the same chain will be used for the
// processing of all subsequent events.
filterChain.reuse();
} else {
@@ -338,7 +338,7 @@
/**
- * Process a Comet event. The main differences here are to not use sendError
+ * Process an event. The main differences here are to not use sendError
* (the response is committed), to avoid creating a new filter chain
* (which would work but be pointless), and a few very minor tweaks.
*
Modified: trunk/java/org/apache/catalina/valves/CometConnectionManagerValve.java
===================================================================
--- trunk/java/org/apache/catalina/valves/CometConnectionManagerValve.java 2009-03-19
18:30:09 UTC (rev 965)
+++ trunk/java/org/apache/catalina/valves/CometConnectionManagerValve.java 2009-03-20
17:26:19 UTC (rev 966)
@@ -19,29 +19,8 @@
package org.apache.catalina.valves;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpSession;
-import javax.servlet.http.HttpSessionEvent;
-import javax.servlet.http.HttpSessionListener;
-import org.apache.catalina.Context;
-import org.apache.catalina.Lifecycle;
-import org.apache.catalina.LifecycleEvent;
-import org.apache.catalina.LifecycleException;
-import org.apache.catalina.LifecycleListener;
-import org.apache.catalina.connector.Request;
-import org.apache.catalina.connector.Response;
-import org.apache.catalina.util.LifecycleSupport;
-import org.apache.catalina.util.StringManager;
-import org.jboss.servlet.http.HttpEvent;
-
-
/**
* <p>Implementation of a Valve that tracks Comet connections, and closes them
* when the associated session expires or the webapp is reloaded.</p>
@@ -50,319 +29,9 @@
*
* @author Remy Maucherat
* @version $Revision$ $Date$
+ * @deprecated Replaced by EventOrAsyncConnectionManagerValve
*/
public class CometConnectionManagerValve
- extends ValveBase
- implements Lifecycle, HttpSessionListener, LifecycleListener {
-
-
- // ----------------------------------------------------- Instance Variables
-
-
- /**
- * The descriptive information related to this implementation.
- */
- protected static final String info =
- "org.apache.catalina.valves.CometConnectionManagerValve/1.0";
-
-
- /**
- * The string manager for this package.
- */
- protected StringManager sm =
- StringManager.getManager(Constants.Package);
-
-
- /**
- * The lifecycle event support for this component.
- */
- protected LifecycleSupport lifecycle = new LifecycleSupport(this);
-
-
- /**
- * Has this component been started yet?
- */
- protected boolean started = false;
-
-
- /**
- * List of current Coment connections.
- */
- protected List<Request> cometRequests =
- Collections.synchronizedList(new ArrayList<Request>());
-
-
- /**
- * Name of session attribute used to store list of comet connections.
- */
- protected String cometRequestsAttribute =
- "org.apache.tomcat.comet.connectionList";
-
-
- // ------------------------------------------------------------- Properties
-
-
- // ------------------------------------------------------ Lifecycle Methods
-
-
- /**
- * Add a lifecycle event listener to this component.
- *
- * @param listener The listener to add
- */
- public void addLifecycleListener(LifecycleListener listener) {
-
- lifecycle.addLifecycleListener(listener);
-
- }
-
-
- /**
- * Get the lifecycle listeners associated with this lifecycle. If this
- * Lifecycle has no listeners registered, a zero-length array is returned.
- */
- public LifecycleListener[] findLifecycleListeners() {
-
- return lifecycle.findLifecycleListeners();
-
- }
-
-
- /**
- * Remove a lifecycle event listener from this component.
- *
- * @param listener The listener to add
- */
- public void removeLifecycleListener(LifecycleListener listener) {
-
- lifecycle.removeLifecycleListener(listener);
-
- }
-
-
- /**
- * Prepare for the beginning of active use of the public methods of this
- * component. This method should be called after
<code>configure()</code>,
- * and before any of the public methods of the component are utilized.
- *
- * @exception LifecycleException if this component detects a fatal error
- * that prevents this component from being used
- */
- public void start() throws LifecycleException {
-
- // Validate and update our current component state
- if (started)
- throw new LifecycleException
- (sm.getString("semaphoreValve.alreadyStarted"));
- lifecycle.fireLifecycleEvent(START_EVENT, null);
- started = true;
-
- if (container instanceof Context) {
- ((Lifecycle) container).addLifecycleListener(this);
- }
-
- }
-
-
- /**
- * Gracefully terminate the active use of the public methods of this
- * component. This method should be the last one called on a given
- * instance of this component.
- *
- * @exception LifecycleException if this component detects a fatal error
- * that needs to be reported
- */
- public void stop() throws LifecycleException {
-
- // Validate and update our current component state
- if (!started)
- throw new LifecycleException
- (sm.getString("semaphoreValve.notStarted"));
- lifecycle.fireLifecycleEvent(STOP_EVENT, null);
- started = false;
-
- if (container instanceof Context) {
- ((Lifecycle) container).removeLifecycleListener(this);
- }
-
- }
-
-
- public void lifecycleEvent(LifecycleEvent event) {
- if (event.getType() == Lifecycle.BEFORE_STOP_EVENT) {
- // The container is getting stopped, close all current connections
- Iterator<Request> iterator = cometRequests.iterator();
- while (iterator.hasNext()) {
- Request request = iterator.next();
- // Remove the session tracking attribute as it isn't
- // serializable or required.
- HttpSession session = request.getSession(false);
- if (session != null) {
- try {
- session.removeAttribute(cometRequestsAttribute);
- } catch (IllegalStateException e) {
- // Ignore
- }
- }
- // Close the comet connection
- try {
- request.getEvent().close();
- } catch (Exception e) {
- container.getLogger().warn(
- sm.getString("cometConnectionManagerValve.event"),
- e);
- }
- }
- cometRequests.clear();
- }
- }
-
-
- // --------------------------------------------------------- Public Methods
-
-
- /**
- * Return descriptive information about this Valve implementation.
- */
- public String getInfo() {
- return (info);
- }
-
-
- /**
- * Register requests for tracking, whenever needed.
- *
- * @param request The servlet request to be processed
- * @param response The servlet response to be created
- *
- * @exception IOException if an input/output error occurs
- * @exception ServletException if a servlet error occurs
- */
- public void invoke(Request request, Response response)
- throws IOException, ServletException {
- // Perform the request
- getNext().invoke(request, response);
-
- if (request.isComet() && !response.isClosed()) {
- // Start tracking this connection, since this is a
- // begin event, and Comet mode is on
- HttpSession session = request.getSession(true);
-
- // Track the conection for webapp reload
- cometRequests.add(request);
-
- // Track the connection for session expiration
- synchronized (session) {
- Request[] requests = (Request[])
- session.getAttribute(cometRequestsAttribute);
- if (requests == null) {
- requests = new Request[1];
- requests[0] = request;
- session.setAttribute(cometRequestsAttribute,
- requests);
- } else {
- Request[] newRequests =
- new Request[requests.length + 1];
- for (int i = 0; i < requests.length; i++) {
- newRequests[i] = requests[i];
- }
- newRequests[requests.length] = request;
- session.setAttribute(cometRequestsAttribute, newRequests);
- }
- }
- }
-
- }
-
-
- /**
- * Use events to update the connection state.
- *
- * @param request The servlet request to be processed
- * @param response The servlet response to be created
- *
- * @exception IOException if an input/output error occurs
- * @exception ServletException if a servlet error occurs
- */
- public void event(Request request, Response response, HttpEvent event)
- throws IOException, ServletException {
-
- // Perform the request
- boolean ok = false;
- try {
- getNext().event(request, response, event);
- ok = true;
- } finally {
- if (!ok || response.isClosed()
- || (event.getType() == HttpEvent.EventType.END)
- || (event.getType() == HttpEvent.EventType.ERROR)) {
-
- // Remove the connection from webapp reload tracking
- cometRequests.remove(request);
-
- // Remove connection from session expiration tracking
- // Note: can't get the session if it has been invalidated but
- // OK since session listener will have done clean-up
- HttpSession session = request.getSession(false);
- if (session != null) {
- synchronized (session) {
- Request[] reqs = (Request[])
- session.getAttribute(cometRequestsAttribute);
- if (reqs != null) {
- boolean found = false;
- for (int i = 0; !found && (i < reqs.length); i++)
{
- found = (reqs[i] == request);
- }
- if (found) {
- if (reqs.length > 1) {
- Request[] newConnectionInfos =
- new Request[reqs.length - 1];
- int pos = 0;
- for (int i = 0; i < reqs.length; i++) {
- if (reqs[i] != request) {
- newConnectionInfos[pos++] = reqs[i];
- }
- }
- session.setAttribute(cometRequestsAttribute,
- newConnectionInfos);
- } else {
- try {
- session.removeAttribute(
- cometRequestsAttribute);
- } catch (IllegalStateException e) {
- // Ignore
- }
- }
- }
- }
- }
- }
- }
- }
-
- }
-
-
- public void sessionCreated(HttpSessionEvent se) {
- }
-
-
- public void sessionDestroyed(HttpSessionEvent se) {
- // Close all Comet connections associated with this session
- Request[] reqs = (Request[])
- se.getSession().getAttribute(cometRequestsAttribute);
- if (reqs != null) {
- for (int i = 0; i < reqs.length; i++) {
- Request req = reqs[i];
- try {
- req.getEvent().close();
- } catch (Exception e) {
- req.getWrapper().getParent().getLogger().warn(sm.getString(
- "cometConnectionManagerValve.listenerEvent"), e);
- }
- }
- }
- }
-
+ extends EventOrAsyncConnectionManagerValve {
}
Added: trunk/java/org/apache/catalina/valves/EventOrAsyncConnectionManagerValve.java
===================================================================
--- trunk/java/org/apache/catalina/valves/EventOrAsyncConnectionManagerValve.java
(rev 0)
+++
trunk/java/org/apache/catalina/valves/EventOrAsyncConnectionManagerValve.java 2009-03-20
17:26:19 UTC (rev 966)
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.valves;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpSessionEvent;
+import javax.servlet.http.HttpSessionListener;
+
+import org.apache.catalina.Context;
+import org.apache.catalina.Lifecycle;
+import org.apache.catalina.LifecycleEvent;
+import org.apache.catalina.LifecycleException;
+import org.apache.catalina.LifecycleListener;
+import org.apache.catalina.connector.Request;
+import org.apache.catalina.connector.Response;
+import org.apache.catalina.util.LifecycleSupport;
+import org.apache.catalina.util.StringManager;
+import org.jboss.servlet.http.HttpEvent;
+
+
+/**
+ * <p>Implementation of a Valve that tracks Comet connections, and closes them
+ * when the associated session expires or the webapp is reloaded.</p>
+ *
+ * <p>This Valve should be attached to a Context.</p>
+ *
+ * @author Remy Maucherat
+ * @version $Revision: 965 $ $Date: 2009-03-19 19:30:09 +0100 (Thu, 19 Mar 2009) $
+ */
+
+public class EventOrAsyncConnectionManagerValve
+ extends ValveBase
+ implements Lifecycle, HttpSessionListener, LifecycleListener {
+
+
+ // ----------------------------------------------------- Instance Variables
+
+
+ /**
+ * The descriptive information related to this implementation.
+ */
+ protected static final String info =
+ "org.apache.catalina.valves.CometConnectionManagerValve/1.0";
+
+
+ /**
+ * The string manager for this package.
+ */
+ protected StringManager sm =
+ StringManager.getManager(Constants.Package);
+
+
+ /**
+ * The lifecycle event support for this component.
+ */
+ protected LifecycleSupport lifecycle = new LifecycleSupport(this);
+
+
+ /**
+ * Has this component been started yet?
+ */
+ protected boolean started = false;
+
+
+ /**
+ * List of current Coment connections.
+ */
+ protected List<Request> cometRequests =
+ Collections.synchronizedList(new ArrayList<Request>());
+
+
+ /**
+ * Name of session attribute used to store list of comet connections.
+ */
+ protected String cometRequestsAttribute =
+ "org.apache.tomcat.comet.connectionList";
+
+
+ // ------------------------------------------------------------- Properties
+
+
+ // ------------------------------------------------------ Lifecycle Methods
+
+
+ /**
+ * Add a lifecycle event listener to this component.
+ *
+ * @param listener The listener to add
+ */
+ public void addLifecycleListener(LifecycleListener listener) {
+
+ lifecycle.addLifecycleListener(listener);
+
+ }
+
+
+ /**
+ * Get the lifecycle listeners associated with this lifecycle. If this
+ * Lifecycle has no listeners registered, a zero-length array is returned.
+ */
+ public LifecycleListener[] findLifecycleListeners() {
+
+ return lifecycle.findLifecycleListeners();
+
+ }
+
+
+ /**
+ * Remove a lifecycle event listener from this component.
+ *
+ * @param listener The listener to add
+ */
+ public void removeLifecycleListener(LifecycleListener listener) {
+
+ lifecycle.removeLifecycleListener(listener);
+
+ }
+
+
+ /**
+ * Prepare for the beginning of active use of the public methods of this
+ * component. This method should be called after
<code>configure()</code>,
+ * and before any of the public methods of the component are utilized.
+ *
+ * @exception LifecycleException if this component detects a fatal error
+ * that prevents this component from being used
+ */
+ public void start() throws LifecycleException {
+
+ // Validate and update our current component state
+ if (started)
+ throw new LifecycleException
+ (sm.getString("semaphoreValve.alreadyStarted"));
+ lifecycle.fireLifecycleEvent(START_EVENT, null);
+ started = true;
+
+ if (container instanceof Context) {
+ ((Lifecycle) container).addLifecycleListener(this);
+ }
+
+ }
+
+
+ /**
+ * Gracefully terminate the active use of the public methods of this
+ * component. This method should be the last one called on a given
+ * instance of this component.
+ *
+ * @exception LifecycleException if this component detects a fatal error
+ * that needs to be reported
+ */
+ public void stop() throws LifecycleException {
+
+ // Validate and update our current component state
+ if (!started)
+ throw new LifecycleException
+ (sm.getString("semaphoreValve.notStarted"));
+ lifecycle.fireLifecycleEvent(STOP_EVENT, null);
+ started = false;
+
+ if (container instanceof Context) {
+ ((Lifecycle) container).removeLifecycleListener(this);
+ }
+
+ }
+
+
+ public void lifecycleEvent(LifecycleEvent event) {
+ if (event.getType() == Lifecycle.BEFORE_STOP_EVENT) {
+ // The container is getting stopped, close all current connections
+ Iterator<Request> iterator = cometRequests.iterator();
+ while (iterator.hasNext()) {
+ Request request = iterator.next();
+ // Remove the session tracking attribute as it isn't
+ // serializable or required.
+ HttpSession session = request.getSession(false);
+ if (session != null) {
+ try {
+ session.removeAttribute(cometRequestsAttribute);
+ } catch (IllegalStateException e) {
+ // Ignore
+ }
+ }
+ // Close the comet connection
+ try {
+ request.getEvent().close();
+ } catch (Exception e) {
+ container.getLogger().warn(
+ sm.getString("cometConnectionManagerValve.event"),
+ e);
+ }
+ }
+ cometRequests.clear();
+ }
+ }
+
+
+ // --------------------------------------------------------- Public Methods
+
+
+ /**
+ * Return descriptive information about this Valve implementation.
+ */
+ public String getInfo() {
+ return (info);
+ }
+
+
+ /**
+ * Register requests for tracking, whenever needed.
+ *
+ * @param request The servlet request to be processed
+ * @param response The servlet response to be created
+ *
+ * @exception IOException if an input/output error occurs
+ * @exception ServletException if a servlet error occurs
+ */
+ public void invoke(Request request, Response response)
+ throws IOException, ServletException {
+ // Perform the request
+ getNext().invoke(request, response);
+
+ if (request.isEventMode() && !response.isClosed()) {
+ // Start tracking this connection, since this is a
+ // begin event, and Comet mode is on
+ HttpSession session = request.getSession(true);
+
+ // Track the conection for webapp reload
+ cometRequests.add(request);
+
+ // Track the connection for session expiration
+ synchronized (session) {
+ Request[] requests = (Request[])
+ session.getAttribute(cometRequestsAttribute);
+ if (requests == null) {
+ requests = new Request[1];
+ requests[0] = request;
+ session.setAttribute(cometRequestsAttribute,
+ requests);
+ } else {
+ Request[] newRequests =
+ new Request[requests.length + 1];
+ for (int i = 0; i < requests.length; i++) {
+ newRequests[i] = requests[i];
+ }
+ newRequests[requests.length] = request;
+ session.setAttribute(cometRequestsAttribute, newRequests);
+ }
+ }
+ }
+
+ }
+
+
+ /**
+ * Use events to update the connection state.
+ *
+ * @param request The servlet request to be processed
+ * @param response The servlet response to be created
+ *
+ * @exception IOException if an input/output error occurs
+ * @exception ServletException if a servlet error occurs
+ */
+ public void event(Request request, Response response, HttpEvent event)
+ throws IOException, ServletException {
+
+ // Perform the request
+ boolean ok = false;
+ try {
+ getNext().event(request, response, event);
+ ok = true;
+ } finally {
+ if (!ok || response.isClosed()
+ || (event.getType() == HttpEvent.EventType.END)
+ || (event.getType() == HttpEvent.EventType.ERROR)) {
+
+ // Remove the connection from webapp reload tracking
+ cometRequests.remove(request);
+
+ // Remove connection from session expiration tracking
+ // Note: can't get the session if it has been invalidated but
+ // OK since session listener will have done clean-up
+ HttpSession session = request.getSession(false);
+ if (session != null) {
+ synchronized (session) {
+ Request[] reqs = (Request[])
+ session.getAttribute(cometRequestsAttribute);
+ if (reqs != null) {
+ boolean found = false;
+ for (int i = 0; !found && (i < reqs.length); i++)
{
+ found = (reqs[i] == request);
+ }
+ if (found) {
+ if (reqs.length > 1) {
+ Request[] newConnectionInfos =
+ new Request[reqs.length - 1];
+ int pos = 0;
+ for (int i = 0; i < reqs.length; i++) {
+ if (reqs[i] != request) {
+ newConnectionInfos[pos++] = reqs[i];
+ }
+ }
+ session.setAttribute(cometRequestsAttribute,
+ newConnectionInfos);
+ } else {
+ try {
+ session.removeAttribute(
+ cometRequestsAttribute);
+ } catch (IllegalStateException e) {
+ // Ignore
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+
+ public void sessionCreated(HttpSessionEvent se) {
+ }
+
+
+ public void sessionDestroyed(HttpSessionEvent se) {
+ // Close all Comet connections associated with this session
+ Request[] reqs = (Request[])
+ se.getSession().getAttribute(cometRequestsAttribute);
+ if (reqs != null) {
+ for (int i = 0; i < reqs.length; i++) {
+ Request req = reqs[i];
+ try {
+ req.getEvent().close();
+ } catch (Exception e) {
+ req.getWrapper().getParent().getLogger().warn(sm.getString(
+ "cometConnectionManagerValve.listenerEvent"), e);
+ }
+ }
+ }
+ }
+
+}
Modified: trunk/java/org/apache/coyote/ActionCode.java
===================================================================
--- trunk/java/org/apache/coyote/ActionCode.java 2009-03-19 18:30:09 UTC (rev 965)
+++ trunk/java/org/apache/coyote/ActionCode.java 2009-03-20 17:26:19 UTC (rev 966)
@@ -135,15 +135,15 @@
/**
- * Callback for begin Comet processing
+ * Callback for begin event processing
*/
- public static final ActionCode ACTION_COMET_BEGIN = new ActionCode(21);
+ public static final ActionCode ACTION_EVENT_BEGIN = new ActionCode(21);
/**
- * Callback for begin Comet processing
+ * Callback for begin event processing
*/
- public static final ActionCode ACTION_COMET_END = new ActionCode(22);
+ public static final ActionCode ACTION_EVENT_END = new ActionCode(22);
/**
@@ -152,24 +152,24 @@
public static final ActionCode ACTION_AVAILABLE = new ActionCode(23);
/**
- * Set a Comet connection timeout
+ * Set a event connection timeout
*/
- public static final ActionCode ACTION_COMET_TIMEOUT = new ActionCode(24);
+ public static final ActionCode ACTION_EVENT_TIMEOUT = new ActionCode(24);
/**
- * Ask for a callback
+ * Ask for a callback event
*/
- public static final ActionCode ACTION_COMET_RESUME = new ActionCode(25);
+ public static final ActionCode ACTION_EVENT_RESUME = new ActionCode(25);
/**
* Put this request to sleep (no read notifications)
*/
- public static final ActionCode ACTION_COMET_SUSPEND = new ActionCode(26);
+ public static final ActionCode ACTION_EVENT_SUSPEND = new ActionCode(26);
/**
* Ask for a write callback
*/
- public static final ActionCode ACTION_COMET_WRITE = new ActionCode(27);
+ public static final ActionCode ACTION_EVENT_WRITE = new ActionCode(27);
// ----------------------------------------------------------- Constructors
int code;
Modified: trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -271,17 +271,17 @@
/**
- * Comet used.
+ * Event used.
*/
- protected boolean comet = false;
+ protected boolean event = false;
/**
- * Comet processing.
+ * Event processing.
*/
- protected boolean cometProcessing = true;
- public void startProcessing() { cometProcessing = true; }
- public void endProcessing() { cometProcessing = false; }
+ protected boolean eventProcessing = true;
+ public void startProcessing() { eventProcessing = true; }
+ public void endProcessing() { eventProcessing = false; }
// ----------------------------------------------------- Static Initializer
@@ -396,7 +396,7 @@
if (error) {
recycle();
return SocketState.CLOSED;
- } else if (!comet) {
+ } else if (!event) {
finish();
recycle();
return SocketState.OPEN;
@@ -428,7 +428,7 @@
boolean openSocket = true;
boolean keptAlive = false;
- while (!error && !comet) {
+ while (!error && !event) {
// Parsing the request header
try {
@@ -498,7 +498,7 @@
}
// Finish the response if not done yet
- if (!comet && !finished) {
+ if (!event && !finished) {
try {
finish();
} catch (Throwable t) {
@@ -513,7 +513,7 @@
}
request.updateCounters();
- if (!comet) {
+ if (!event) {
recycle();
}
@@ -523,12 +523,12 @@
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (comet) {
+ if (event) {
if (error) {
recycle();
return SocketState.CLOSED;
} else {
- cometProcessing = false;
+ eventProcessing = false;
return SocketState.LONG;
}
} else {
@@ -661,20 +661,20 @@
empty = false;
replay = true;
- } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
- comet = true;
- } else if (actionCode == ActionCode.ACTION_COMET_END) {
- comet = false;
- } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_BEGIN) {
+ event = true;
+ } else if (actionCode == ActionCode.ACTION_EVENT_END) {
+ event = false;
+ } else if (actionCode == ActionCode.ACTION_EVENT_SUSPEND) {
// No action needed
- } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_RESUME) {
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
- if (!cometProcessing && !resumeNotification) {
- endpoint.getCometPoller().add(socket, timeout, false, false, true,
true);
+ if (!eventProcessing && !resumeNotification) {
+ endpoint.getEventPoller().add(socket, timeout, false, false, true,
true);
}
resumeNotification = true;
- } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_TIMEOUT) {
timeout = ((Integer) param).intValue();
}
@@ -1273,7 +1273,7 @@
finished = false;
timeout = -1;
resumeNotification = false;
- cometProcessing = true;
+ eventProcessing = true;
request.recycle();
response.recycle();
certificates.recycle();
Modified: trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -430,7 +430,7 @@
}
} else {
if (proto.endpoint.isRunning()) {
- proto.endpoint.getCometPoller().add(socket,
result.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket,
result.getTimeout(),
false, false, result.getResumeNotification(),
false);
}
}
@@ -454,7 +454,7 @@
// processed by this thread will use either a new or a recycled
// processor.
connections.put(socket, processor);
- proto.endpoint.getCometPoller().add(socket, processor.getTimeout(),
false,
+ proto.endpoint.getEventPoller().add(socket, processor.getTimeout(),
false,
false, processor.getResumeNotification(), false);
} else {
recycledProcessors.offer(processor);
Modified: trunk/java/org/apache/coyote/ajp/AjpProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpProcessor.java 2009-03-19 18:30:09 UTC (rev 965)
+++ trunk/java/org/apache/coyote/ajp/AjpProcessor.java 2009-03-20 17:26:19 UTC (rev 966)
@@ -272,17 +272,17 @@
/**
- * Comet used.
+ * Event used.
*/
- protected boolean comet = false;
+ protected boolean event = false;
/**
- * Comet processing.
+ * Event processing.
*/
- protected boolean cometProcessing = true;
- public void startProcessing() { cometProcessing = true; }
- public void endProcessing() { cometProcessing = false; }
+ protected boolean eventProcessing = true;
+ public void startProcessing() { eventProcessing = true; }
+ public void endProcessing() { eventProcessing = false; }
// ----------------------------------------------------- Static Initializer
@@ -406,7 +406,7 @@
if (error) {
recycle();
return SocketState.CLOSED;
- } else if (!comet) {
+ } else if (!event) {
finish();
recycle();
return SocketState.OPEN;
@@ -439,7 +439,7 @@
// Error flag
error = false;
- while (!error && !comet) {
+ while (!error && !event) {
// Parsing the request header
try {
@@ -513,7 +513,7 @@
}
// Finish the response if not done yet
- if (!comet && !finished) {
+ if (!event && !finished) {
try {
finish();
} catch (Throwable t) {
@@ -528,7 +528,7 @@
}
request.updateCounters();
- if (!comet) {
+ if (!event) {
recycle();
}
@@ -538,14 +538,14 @@
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (comet) {
+ if (event) {
if (error) {
input = null;
output = null;
recycle();
return SocketState.CLOSED;
} else {
- cometProcessing = false;
+ eventProcessing = false;
return SocketState.LONG;
}
} else {
@@ -675,20 +675,20 @@
empty = false;
replay = true;
- } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
- comet = true;
- } else if (actionCode == ActionCode.ACTION_COMET_END) {
- comet = false;
- } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_BEGIN) {
+ event = true;
+ } else if (actionCode == ActionCode.ACTION_EVENT_END) {
+ event = false;
+ } else if (actionCode == ActionCode.ACTION_EVENT_SUSPEND) {
// No action needed
- } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_RESUME) {
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
- if (!cometProcessing && !resumeNotification) {
- endpoint.getPoller().add(socket, timeout, true, true);
+ if (!eventProcessing && !resumeNotification) {
+ endpoint.getEventPoller().add(socket, timeout, true, true);
}
resumeNotification = true;
- } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_TIMEOUT) {
timeout = ((Integer) param).intValue();
}
@@ -1228,7 +1228,7 @@
finished = false;
timeout = -1;
resumeNotification = false;
- cometProcessing = true;
+ eventProcessing = true;
request.recycle();
response.recycle();
certificates.recycle();
Modified: trunk/java/org/apache/coyote/ajp/AjpProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-19 18:30:09 UTC (rev 965)
+++ trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-20 17:26:19 UTC (rev 966)
@@ -413,15 +413,9 @@
if (state != SocketState.LONG) {
connections.remove(socket);
recycledProcessors.offer(result);
- // FIXME: if the socket is still open, we should send it back to
reprocess it
- // as if it was an initial request, or the simple solution is to
close after
- // an async request; will see
- if (proto.endpoint.isRunning() && state ==
SocketState.OPEN) {
- //proto.endpoint.getPoller().add(socket);
- }
} else {
if (proto.endpoint.isRunning()) {
- proto.endpoint.getPoller().add(socket, result.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket,
result.getTimeout(),
result.getResumeNotification(), false);
}
}
@@ -445,7 +439,7 @@
// processed by this thread will use either a new or a recycled
// processor.
connections.put(socket, processor);
- proto.endpoint.getPoller().add(socket, processor.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket, processor.getTimeout(),
processor.getResumeNotification(), false);
} else {
recycledProcessors.offer(processor);
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -179,9 +179,9 @@
/**
- * Comet used.
+ * Event used.
*/
- protected boolean comet = false;
+ protected boolean event = false;
/**
@@ -321,19 +321,19 @@
protected boolean readNotifications = true;
protected boolean writeNotification = false;
protected boolean resumeNotification = false;
- protected boolean cometProcessing = true;
+ protected boolean eventProcessing = true;
// ------------------------------------------------------------- Properties
public void startProcessing() {
- cometProcessing = true;
+ eventProcessing = true;
}
public void endProcessing() {
- cometProcessing = false;
+ eventProcessing = false;
}
@@ -785,7 +785,7 @@
outputBuffer.nextRequest();
recycle();
return SocketState.CLOSED;
- } else if (!comet) {
+ } else if (!event) {
endRequest();
boolean pipelined = inputBuffer.nextRequest();
outputBuffer.nextRequest();
@@ -822,7 +822,7 @@
// Error flag
error = false;
- comet = false;
+ event = false;
keepAlive = true;
int keepAliveLeft = maxKeepAliveRequests;
@@ -831,7 +831,7 @@
boolean keptAlive = false;
boolean openSocket = false;
- while (!error && keepAlive && !comet) {
+ while (!error && keepAlive && !event) {
// Parsing the request header
try {
@@ -910,7 +910,7 @@
// If there is an unspecified error, the connection will be closed
inputBuffer.setSwallowInput(false);
}
- if (!comet) {
+ if (!event) {
endRequest();
}
@@ -922,7 +922,7 @@
request.updateCounters();
boolean pipelined = false;
- if (!comet) {
+ if (!event) {
// Next request
pipelined = inputBuffer.nextRequest();
outputBuffer.nextRequest();
@@ -944,14 +944,14 @@
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (comet) {
+ if (event) {
if (error) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
recycle();
return SocketState.CLOSED;
} else {
- cometProcessing = false;
+ eventProcessing = false;
return SocketState.LONG;
}
} else {
@@ -995,7 +995,7 @@
readNotifications = true;
writeNotification = false;
resumeNotification = false;
- cometProcessing = true;
+ eventProcessing = true;
}
@@ -1059,7 +1059,7 @@
// End the processing of the current request, and stop any further
// transactions with the client
- comet = false;
+ event = false;
try {
outputBuffer.endRequest();
} catch (IOException e) {
@@ -1236,38 +1236,38 @@
} else if (actionCode == ActionCode.ACTION_AVAILABLE) {
inputBuffer.useAvailable();
- } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
- comet = true;
+ } else if (actionCode == ActionCode.ACTION_EVENT_BEGIN) {
+ event = true;
// Set socket to non blocking mode
Socket.timeoutSet(socket, 0);
outputBuffer.setNonBlocking(true);
inputBuffer.setNonBlocking(true);
- } else if (actionCode == ActionCode.ACTION_COMET_END) {
- comet = false;
+ } else if (actionCode == ActionCode.ACTION_EVENT_END) {
+ event = false;
// End non blocking mode
outputBuffer.setNonBlocking(false);
inputBuffer.setNonBlocking(false);
if (!error) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
}
- } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_SUSPEND) {
readNotifications = false;
- } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_RESUME) {
readNotifications = true;
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
- if (!cometProcessing && !resumeNotification) {
- endpoint.getCometPoller().add(socket, timeout, false, false, true,
true);
+ if (!eventProcessing && !resumeNotification) {
+ endpoint.getEventPoller().add(socket, timeout, false, false, true,
true);
}
resumeNotification = true;
- } else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_WRITE) {
// An event is being processed already: adding for write will be done
// when the socket gets back to the poller
- if (!cometProcessing && !writeNotification) {
- endpoint.getCometPoller().add(socket, timeout, false, true, false,
true);
+ if (!eventProcessing && !writeNotification) {
+ endpoint.getEventPoller().add(socket, timeout, false, true, false,
true);
}
writeNotification = true;
- } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_TIMEOUT) {
timeout = ((Integer) param).intValue();
}
Modified: trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-19 18:30:09 UTC
(rev 965)
+++ trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-20 17:26:19 UTC
(rev 966)
@@ -579,7 +579,7 @@
}
} else {
if (proto.endpoint.isRunning()) {
- proto.endpoint.getCometPoller().add(socket,
result.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket,
result.getTimeout(),
result.getReadNotifications(),
result.getWriteNotification(), result.getResumeNotification(), false);
}
}
@@ -606,7 +606,7 @@
// Call a read event right away
state = event(socket, SocketStatus.OPEN_READ);
} else {
- proto.endpoint.getCometPoller().add(socket,
processor.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket,
processor.getTimeout(),
processor.getReadNotifications(), false,
processor.getResumeNotification(), false);
}
} else {
Modified: trunk/java/org/apache/coyote/http11/Http11Processor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11Processor.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/coyote/http11/Http11Processor.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -305,9 +305,9 @@
/**
- * Comet used.
+ * Event used.
*/
- protected boolean comet = false;
+ protected boolean event = false;
/**
@@ -317,11 +317,11 @@
/**
- * Comet processing.
+ * Event processing.
*/
- protected boolean cometProcessing = true;
- public void startProcessing() { cometProcessing = true; }
- public void endProcessing() { cometProcessing = false; }
+ protected boolean eventProcessing = true;
+ public void startProcessing() { eventProcessing = true; }
+ public void endProcessing() { eventProcessing = false; }
// ------------------------------------------------------------- Properties
@@ -766,7 +766,7 @@
outputBuffer.nextRequest();
recycle();
return SocketState.CLOSED;
- } else if (!comet) {
+ } else if (!event) {
endRequest();
boolean pipelined = inputBuffer.nextRequest();
outputBuffer.nextRequest();
@@ -829,7 +829,7 @@
boolean keptAlive = false;
- while (!error && keepAlive && !comet) {
+ while (!error && keepAlive && !event) {
// Parsing the request header
try {
@@ -906,7 +906,7 @@
// If there is an unspecified error, the connection will be closed
inputBuffer.setSwallowInput(false);
}
- if (!comet) {
+ if (!event) {
endRequest();
}
@@ -917,7 +917,7 @@
}
request.updateCounters();
- if (!comet) {
+ if (!event) {
// Next request
inputBuffer.nextRequest();
outputBuffer.nextRequest();
@@ -929,14 +929,14 @@
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- if (comet) {
+ if (event) {
if (error) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
recycle();
return SocketState.CLOSED;
} else {
- cometProcessing = false;
+ eventProcessing = false;
return SocketState.LONG;
}
} else {
@@ -979,7 +979,7 @@
sslSupport = null;
timeout = -1;
resumeNotification = false;
- cometProcessing = true;
+ eventProcessing = true;
}
@@ -1165,20 +1165,20 @@
InternalInputBuffer internalBuffer = (InternalInputBuffer)
request.getInputBuffer();
internalBuffer.addActiveFilter(savedBody);
- } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
- comet = true;
- } else if (actionCode == ActionCode.ACTION_COMET_END) {
- comet = false;
- } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_BEGIN) {
+ event = true;
+ } else if (actionCode == ActionCode.ACTION_EVENT_END) {
+ event = false;
+ } else if (actionCode == ActionCode.ACTION_EVENT_SUSPEND) {
// No action needed
- } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_RESUME) {
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
- if (!cometProcessing && !resumeNotification) {
- endpoint.getPoller().add(socket, timeout, true, true);
+ if (!eventProcessing && !resumeNotification) {
+ endpoint.getEventPoller().add(socket, timeout, true, true);
}
resumeNotification = true;
- } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ } else if (actionCode == ActionCode.ACTION_EVENT_TIMEOUT) {
timeout = ((Integer) param).intValue();
}
Modified: trunk/java/org/apache/coyote/http11/Http11Protocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -623,15 +623,9 @@
if (state != SocketState.LONG) {
connections.remove(socket);
recycledProcessors.offer(result);
- // FIXME: if the socket is still open, we should send it back to
reprocess it
- // as if it was an initial request, or the simple solution is to
close after
- // an async request; will see
- if (proto.endpoint.isRunning() && state ==
SocketState.OPEN) {
- //proto.endpoint.getPoller().add(socket);
- }
} else {
if (proto.endpoint.isRunning()) {
- proto.endpoint.getPoller().add(socket, result.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket,
result.getTimeout(),
result.getResumeNotification(), false);
}
}
@@ -662,7 +656,7 @@
// processed by this thread will use either a new or a recycled
// processor.
connections.put(socket, processor);
- proto.endpoint.getPoller().add(socket, processor.getTimeout(),
+ proto.endpoint.getEventPoller().add(socket, processor.getTimeout(),
processor.getResumeNotification(), false);
} else {
recycledProcessors.offer(processor);
Modified: trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
===================================================================
--- trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2009-03-19 18:30:09
UTC (rev 965)
+++ trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 2009-03-20 17:26:19
UTC (rev 966)
@@ -534,7 +534,7 @@
}
- // If non blocking (comet) and there are leftover bytes,
+ // If non blocking (event) and there are leftover bytes,
// and lastWrite was 0 -> error
if (leftover.getLength() > 0 &&
!(Http11AprProcessor.containerThread.get() == Boolean.TRUE)) {
throw new IOException(sm.getString("oob.backlog"));
@@ -835,7 +835,7 @@
// Call for a write event because it is possible that no further
write
// operations are made
if (!response.getFlushLeftovers()) {
- response.action(ActionCode.ACTION_COMET_WRITE, null);
+ response.action(ActionCode.ACTION_EVENT_WRITE, null);
}
}
}
@@ -869,7 +869,7 @@
public int doWrite(ByteChunk chunk, Response res)
throws IOException {
- // If non blocking (comet) and there are leftover bytes,
+ // If non blocking (event) and there are leftover bytes,
// put all remaining bytes in the leftover buffer (they are
// part of the same write operation)
if (leftover.getLength() > 0) {
@@ -885,7 +885,7 @@
if (bbuf.position() == bbuf.capacity()) {
flushBuffer();
if (leftover.getLength() > 0) {
- // If non blocking (comet) and there are leftover bytes,
+ // If non blocking (event) and there are leftover bytes,
// put all remaining bytes in the leftover buffer (they are
// part of the same write operation)
int oldStart = chunk.getOffset();
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -43,17 +43,14 @@
import org.jboss.logging.Logger;
/**
- * APR tailored thread pool, providing the following services:
+ * APR endpoint, providing the following services:
* <ul>
* <li>Socket acceptor thread</li>
* <li>Socket poller thread</li>
* <li>Sendfile thread</li>
- * <li>Worker threads pool</li>
+ * <li>Simple Worker thread pool, with possible use of executors</li>
* </ul>
*
- * When switching to Java 5, there's an opportunity to use the virtual
- * machine's thread pool.
- *
* @author Mladen Turk
* @author Remy Maucherat
*/
@@ -321,11 +318,11 @@
/**
- * The socket poller used for Comet support.
+ * The socket poller used for event support.
*/
- protected Poller cometPoller = null;
- public Poller getCometPoller() {
- return cometPoller;
+ protected Poller eventPoller = null;
+ public Poller getEventPoller() {
+ return eventPoller;
}
@@ -695,13 +692,13 @@
pollerThread.setDaemon(true);
pollerThread.start();
- // Start comet poller thread
- cometPoller = new Poller(true);
- cometPoller.init();
- Thread cometPollerThread = new Thread(cometPoller, getName() +
"-CometPoller");
- cometPollerThread.setPriority(threadPriority);
- cometPollerThread.setDaemon(true);
- cometPollerThread.start();
+ // Start event poller thread
+ eventPoller = new Poller(true);
+ eventPoller.init();
+ Thread eventPollerThread = new Thread(eventPoller, getName() +
"-EventPoller");
+ eventPollerThread.setPriority(threadPriority);
+ eventPollerThread.setDaemon(true);
+ eventPollerThread.start();
// Start sendfile thread
if (useSendfile) {
@@ -754,8 +751,8 @@
unlockAccept();
poller.destroy();
poller = null;
- cometPoller.destroy();
- cometPoller = null;
+ eventPoller.destroy();
+ eventPoller = null;
if (useSendfile) {
sendfile.destroy();
sendfile = null;
@@ -1358,9 +1355,9 @@
protected SocketList localAddList = null;
/**
- * Comet mode flag.
+ * Event mode flag.
*/
- protected boolean comet = true;
+ protected boolean event = true;
/**
* Structure used for storing timeouts.
@@ -1380,8 +1377,8 @@
protected int connectionCount = 0;
public int getConnectionCount() { return connectionCount; }
- public Poller(boolean comet) {
- this.comet = comet;
+ public Poller(boolean event) {
+ this.event = event;
}
/**
@@ -1455,7 +1452,7 @@
// Close all sockets in the add queue
SocketInfo info = addList.get();
while (info != null) {
- if (!comet || (comet && !processSocket(info.socket,
SocketStatus.STOP))) {
+ if (!event || (event && !processSocket(info.socket,
SocketStatus.STOP))) {
Socket.destroy(info.socket);
}
info = addList.get();
@@ -1466,7 +1463,7 @@
int rv = Poll.pollset(pollers[i], desc);
if (rv > 0) {
for (int n = 0; n < rv; n++) {
- if (!comet || (comet && !processSocket(desc[n*2+1],
SocketStatus.STOP))) {
+ if (!event || (event && !processSocket(desc[n*2+1],
SocketStatus.STOP))) {
Socket.destroy(desc[n*2+1]);
}
}
@@ -1500,7 +1497,7 @@
}
if (!ok) {
// Can't do anything: close the socket right away
- if (!comet || (comet && !processSocket(socket,
SocketStatus.ERROR))) {
+ if (!event || (event && !processSocket(socket,
SocketStatus.ERROR))) {
Socket.destroy(socket);
}
}
@@ -1538,7 +1535,7 @@
}
if (!ok) {
// Can't do anything: close the socket right away
- if (!comet || (comet && !processSocket(socket,
SocketStatus.ERROR))) {
+ if (!event || (event && !processSocket(socket,
SocketStatus.ERROR))) {
Socket.destroy(socket);
}
}
@@ -1595,7 +1592,7 @@
long socket = timeouts.check(date);
while (socket != 0) {
removeFromPoller(socket);
- if (!comet || (comet && !processSocket(socket,
SocketStatus.TIMEOUT))) {
+ if (!event || (event && !processSocket(socket,
SocketStatus.TIMEOUT))) {
Socket.destroy(socket);
}
socket = timeouts.check(date);
@@ -1608,7 +1605,7 @@
*/
public String toString() {
StringBuffer buf = new StringBuffer();
- buf.append("Poller comet=[").append(comet).append("]");
+ buf.append("Poller event=[").append(event).append("]");
long[] res = new long[actualPollerSize * 2];
for (int i = 0; i < pollers.length; i++) {
int count = Poll.pollset(pollers[i], res);
@@ -1679,7 +1676,7 @@
| ((info.write()) ? Poll.APR_POLLOUT : 0);
if (!addToPoller(info.socket, events)) {
// Can't do anything: close the socket
right away
- if (!comet || (comet &&
!processSocket(info.socket, SocketStatus.ERROR))) {
+ if (!event || (event &&
!processSocket(info.socket, SocketStatus.ERROR))) {
Socket.destroy(info.socket);
}
} else {
@@ -1697,14 +1694,14 @@
}
} else {
// Store timeout
- if (comet) {
+ if (event) {
removeFromPoller(info.socket);
}
int events = ((info.read()) ? Poll.APR_POLLIN : 0)
| ((info.write()) ? Poll.APR_POLLOUT : 0);
if (!addToPoller(info.socket, events)) {
// Can't do anything: close the socket right
away
- if (!comet || (comet &&
!processSocket(info.socket, SocketStatus.ERROR))) {
+ if (!event || (event &&
!processSocket(info.socket, SocketStatus.ERROR))) {
Socket.destroy(info.socket);
}
} else {
@@ -1713,7 +1710,7 @@
}
} else {
// This is either a resume or a suspend.
- if (comet) {
+ if (event) {
if (info.resume()) {
// Resume event
timeouts.remove(info.socket);
@@ -1726,7 +1723,7 @@
timeouts.add(info.socket,
System.currentTimeMillis() + info.timeout);
}
} else {
- // Should never happen, if not Comet, the socket is
always put in
+ // Should never happen, if not event, the socket is
always put in
// the list with the read flag.
timeouts.remove(info.socket);
Socket.destroy(info.socket);
@@ -1742,7 +1739,7 @@
// Flags to ask to reallocate the pool
boolean reset = false;
- ArrayList<Long> skip = null;
+ //ArrayList<Long> skip = null;
int rv = 0;
// Iterate on each pollers, but no need to poll empty pollers
@@ -1755,8 +1752,8 @@
for (int n = 0; n < rv; n++) {
timeouts.remove(desc[n*2+1]);
// Check for failed sockets and hand this socket off to a
worker
- if (comet) {
- // Comet processes either a read or a write depending
on what the poller returns
+ if (event) {
+ // Event processes either a read or a write depending
on what the poller returns
if (((desc[n*2] & Poll.APR_POLLHUP) ==
Poll.APR_POLLHUP)
|| ((desc[n*2] & Poll.APR_POLLERR) ==
Poll.APR_POLLERR)) {
if (!processSocket(desc[n*2+1],
SocketStatus.ERROR)) {
@@ -1844,7 +1841,7 @@
// Process socket timeouts
if (soTimeout > 0 && maintain++ > 1000 &&
running) {
// This works and uses only one timeout mechanism for everything,
but the
- // non Comet poller might be a bit faster by using the old
maintain.
+ // non event poller might be a bit faster by using the old
maintain.
maintain = 0;
maintain();
}
Modified: trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-19 18:30:09 UTC (rev
965)
+++ trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-20 17:26:19 UTC (rev
966)
@@ -29,7 +29,6 @@
import java.net.Socket;
import java.util.concurrent.Executor;
-import org.apache.tomcat.util.net.AprEndpoint.SocketInfo;
import org.apache.tomcat.util.res.StringManager;
import org.jboss.logging.Logger;
@@ -270,11 +269,11 @@
/**
- * The socket poller.
+ * The socket poller used for event support.
*/
- protected Poller poller = null;
- public Poller getPoller() {
- return poller;
+ protected Poller eventPoller = null;
+ public Poller getEventPoller() {
+ return eventPoller;
}
@@ -570,10 +569,18 @@
public void run() {
- // Process the request from this socket
- if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
+ Handler.SocketState socketState = handler.event(socket, status);
+ if (socketState == Handler.SocketState.CLOSED) {
// Close socket
try { socket.close(); } catch (IOException e) { }
+ } else if (socketState == Handler.SocketState.OPEN) {
+ // Process the keepalive after the event processing
+ // This is the main behavior difference with endpoint with pollers,
which
+ // will add the socket to the poller
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket
+ try { socket.close(); } catch (IOException e) { }
+ }
}
socket = null;
@@ -802,8 +809,6 @@
// Process socket timeouts
if (soTimeout > 0 && maintain++ > 1000 &&
running) {
- // This works and uses only one timeout mechanism for everything,
but the
- // non Comet poller might be a bit faster by using the old
maintain.
maintain = 0;
maintain();
}
@@ -925,10 +930,20 @@
continue;
// Process the request from this socket
- if ((status != null) && (handler.event(socket, status) ==
Handler.SocketState.CLOSED)) {
- // FIXME: If handler.event returns Handler.SocketState.OPEN, then
likely should process without socketOptions
- // Close socket
- try { socket.close(); } catch (IOException e) { }
+ if (status != null){
+ Handler.SocketState socketState = handler.event(socket, status);
+ if (socketState == Handler.SocketState.CLOSED) {
+ // Close socket
+ try { socket.close(); } catch (IOException e) { }
+ } else if (socketState == Handler.SocketState.OPEN) {
+ // Process the keepalive after the event processing
+ // This is the main behavior difference with endpoint with
pollers, which
+ // will add the socket to the poller
+ if (handler.process(socket) == Handler.SocketState.CLOSED) {
+ // Close socket
+ try { socket.close(); } catch (IOException e) { }
+ }
+ }
} else if ((status == null) && (!setSocketOptions(socket) ||
(handler.process(socket) == Handler.SocketState.CLOSED))) {
// Close socket
try { socket.close(); } catch (IOException e) { }
@@ -1009,10 +1024,10 @@
workers = new WorkerStack(maxThreads);
}
- // Start poller thread
- poller = new Poller();
- poller.init();
- Thread pollerThread = new Thread(poller, getName() + "-Poller");
+ // Start event poller thread
+ eventPoller = new Poller();
+ eventPoller.init();
+ Thread pollerThread = new Thread(eventPoller, getName() +
"-Poller");
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
@@ -1044,8 +1059,8 @@
if (running) {
running = false;
unlockAccept();
- poller.destroy();
- poller = null;
+ eventPoller.destroy();
+ eventPoller = null;
}
}
Modified: trunk/webapps/docs/changelog.xml
===================================================================
--- trunk/webapps/docs/changelog.xml 2009-03-19 18:30:09 UTC (rev 965)
+++ trunk/webapps/docs/changelog.xml 2009-03-20 17:26:19 UTC (rev 966)
@@ -41,10 +41,6 @@
<fix>
<bug>46866</bug>: Use nanoTime rather that a weaker init for the
Random fallback. (remm)
</fix>
- <update>
- Add support for events to all connectors (IO events are only available on
- the APR HTTP connector, of course). (remm)
- </update>
</changelog>
</subsection>
<subsection name="Coyote">
@@ -52,6 +48,9 @@
<fix>
Remove useless instanceof in the HTTP protocol. (markt)
</fix>
+ <update>
+ Add support for non IO events in all connectors. (remm)
+ </update>
</changelog>
</subsection>
</section>