Author: remy.maucherat(a)jboss.com
Date: 2009-03-18 08:46:31 -0400 (Wed, 18 Mar 2009)
New Revision: 961
Modified:
trunk/ROADMAP.txt
trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
trunk/java/org/apache/coyote/ajp/AjpProcessor.java
trunk/java/org/apache/coyote/ajp/AjpProtocol.java
trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
trunk/java/org/apache/coyote/http11/Http11Processor.java
trunk/java/org/apache/coyote/http11/Http11Protocol.java
trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
Log:
- Implement a portion of the event API for java.io HTTP and AJP. No READ, WRITE or any
other IO events, only timeouts and resume.
Modified: trunk/ROADMAP.txt
===================================================================
--- trunk/ROADMAP.txt 2009-03-17 17:02:07 UTC (rev 960)
+++ trunk/ROADMAP.txt 2009-03-18 12:46:31 UTC (rev 961)
@@ -2,14 +2,11 @@
Main development:
- Setup standalone TCK environment for testing compliance with the new features
-- Implement Servlet 3.0 async for APR HTTP connector
+- Implement Servlet 3.0 async over current IO events
- Update digester XML parsing rules for web.xml updates
- Implement new APIs for programmatic deployment descriptor access
- Implement annotation scanning for JBoss Web standalone (likely disabled by default
using conf/web.xml)
- Implement any other Servlet 3.0 changes (web.xml fragments, security, etc)
-- Implement Servlet 3.0 async for APR AJP connector
-- Implement Servlet 3.0 async for java.io HTTP connector
-- Implement Servlet 3.0 async for java.io AJP connector
- Implement JSP 2.2 changes
- Implement EL 1.1 changes
- Coordinate with AS 6 to implement new web.xml parsing (out of tree)
Modified: trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java 2009-03-17 17:02:07 UTC (rev
960)
+++ trunk/java/org/apache/coyote/ajp/AjpAprProcessor.java 2009-03-18 12:46:31 UTC (rev
961)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.ajp;
@@ -41,6 +46,8 @@
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.AprEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.AprEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
@@ -163,12 +170,6 @@
/**
- * State flag.
- */
- protected boolean started = false;
-
-
- /**
* Error flag.
*/
protected boolean error = false;
@@ -269,6 +270,20 @@
protected static final ByteBuffer flushMessageBuffer;
+ /**
+ * Comet used.
+ */
+ protected boolean comet = false;
+
+
+ /**
+ * Comet processing.
+ */
+ protected boolean cometProcessing = true;
+ public void startProcessing() { cometProcessing = true; }
+ public void endProcessing() { cometProcessing = false; }
+
+
// ----------------------------------------------------- Static Initializer
@@ -326,6 +341,21 @@
public void setRequiredSecret(String requiredSecret) { this.requiredSecret =
requiredSecret; }
+ /**
+ * Timeout.
+ */
+ protected int timeout = -1;
+ public void setTimeout(int timeout) { this.timeout = timeout; }
+ public int getTimeout() { return timeout; }
+
+
+ /**
+ * A resume has been requested.
+ */
+ protected boolean resumeNotification = false;
+ public boolean getResumeNotification() { return resumeNotification; }
+
+
// --------------------------------------------------------- Public Methods
@@ -338,13 +368,51 @@
}
+ public SocketState event(SocketStatus status)
+ throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ if (status == SocketStatus.OPEN_CALLBACK) {
+ // The resume notification is now done
+ resumeNotification = false;
+ } else if (status == SocketStatus.ERROR) {
+ // Set error flag right away
+ error = true;
+ }
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.event(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (!comet) {
+ finish();
+ recycle();
+ return SocketState.OPEN;
+ } else {
+ return SocketState.LONG;
+ }
+ }
+
+
/**
* Process pipelined HTTP requests using the specified input and output
* streams.
*
* @throws IOException error during an I/O operation
*/
- public boolean process(long socket)
+ public SocketState process(long socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -360,7 +428,7 @@
boolean openSocket = true;
boolean keptAlive = false;
- while (started && !error) {
+ while (!error && !comet) {
// Parsing the request header
try {
@@ -370,6 +438,8 @@
// (long keepalive), so that the processor should be recycled
// and the method should return true
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+ // Add the socket to the poller
+ endpoint.getPoller().add(socket);
break;
}
// Check message type, process right away and break if
@@ -428,7 +498,7 @@
}
// Finish the response if not done yet
- if (!finished) {
+ if (!comet && !finished) {
try {
finish();
} catch (Throwable t) {
@@ -443,23 +513,28 @@
}
request.updateCounters();
+ if (!comet) {
+ recycle();
+ }
+
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
- recycle();
}
- // Add the socket to the poller
- if (!error) {
- endpoint.getPoller().add(socket);
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (comet) {
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ cometProcessing = false;
+ return SocketState.LONG;
+ }
} else {
- openSocket = false;
+ recycle();
+ return SocketState.CLOSED;
}
-
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- recycle();
-
- return openSocket;
-
}
@@ -525,14 +600,6 @@
error = true;
}
- } else if (actionCode == ActionCode.ACTION_START) {
-
- started = true;
-
- } else if (actionCode == ActionCode.ACTION_STOP) {
-
- started = false;
-
} else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {
if (!certificates.isNull()) {
@@ -594,6 +661,21 @@
empty = false;
replay = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
+ comet = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_END) {
+ comet = false;
+ } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ // No action needed
+ } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ // An event is being processed already: adding for resume will be done
+ // when the socket gets back to the poller
+ if (!cometProcessing && !resumeNotification) {
+ endpoint.getCometPoller().add(socket, timeout, false, false, true,
true);
+ }
+ resumeNotification = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ timeout = ((Integer) param).intValue();
}
@@ -1189,6 +1271,9 @@
empty = true;
replay = false;
finished = false;
+ timeout = -1;
+ resumeNotification = false;
+ cometProcessing = true;
request.recycle();
response.recycle();
certificates.recycle();
Modified: trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-17 17:02:07 UTC (rev
960)
+++ trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-18 12:46:31 UTC (rev
961)
@@ -26,6 +26,7 @@
import java.net.URLEncoder;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -35,8 +36,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.coyote.ActionCode;
-import org.apache.coyote.ActionHook;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.RequestGroupInfo;
@@ -352,6 +351,8 @@
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
+ protected ConcurrentHashMap<Long, AjpAprProcessor> connections =
+ new ConcurrentHashMap<Long, AjpAprProcessor>();
protected ConcurrentLinkedQueue<AjpAprProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpAprProcessor>() {
protected AtomicInteger size = new AtomicInteger(0);
@@ -392,9 +393,51 @@
this.proto = proto;
}
- // FIXME: Support async
public SocketState event(long socket, SocketStatus status) {
- return SocketState.CLOSED;
+ AjpAprProcessor result = connections.get(socket);
+ SocketState state = SocketState.CLOSED;
+ if (result != null) {
+ result.startProcessing();
+ // Call the appropriate event
+ try {
+ state = result.event(status);
+ } catch (java.net.SocketException e) {
+ // SocketExceptions are normal
+ AjpAprProcessor.log.debug
+ (sm.getString
+ ("ajpprotocol.proto.socketexception.debug"), e);
+ } catch (java.io.IOException e) {
+ // IOExceptions are normal
+ AjpAprProcessor.log.debug
+ (sm.getString
+ ("ajpprotocol.proto.ioexception.debug"), e);
+ }
+ // Future developers: if you discover any other
+ // rare-but-nonfatal exceptions, catch them here, and log as
+ // above.
+ catch (Throwable e) {
+ // any other exception or error is odd. Here we log it
+ // with "ERROR" level, so it will show up even on
+ // less-than-verbose logs.
+ AjpAprProcessor.log.error
+ (sm.getString("ajpprotocol.proto.error"), e);
+ } finally {
+ if (state != SocketState.LONG) {
+ connections.remove(socket);
+ recycledProcessors.offer(result);
+ if (proto.endpoint.isRunning() && state ==
SocketState.OPEN) {
+ proto.endpoint.getPoller().add(socket);
+ }
+ } else {
+ if (proto.endpoint.isRunning()) {
+ proto.endpoint.getCometPoller().add(socket,
result.getTimeout(),
+ false, false, result.getResumeNotification(),
false);
+ }
+ }
+ result.endProcessing();
+ }
+ }
+ return state;
}
public SocketState process(long socket) {
@@ -405,15 +448,18 @@
processor = createProcessor();
}
- if (processor instanceof ActionHook) {
- ((ActionHook) processor).action(ActionCode.ACTION_START, null);
- }
-
- if (processor.process(socket)) {
- return SocketState.OPEN;
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Associate the connection with the processor. The next request
+ // processed by this thread will use either a new or a recycled
+ // processor.
+ connections.put(socket, processor);
+ proto.endpoint.getCometPoller().add(socket, processor.getTimeout(),
false,
+ false, processor.getResumeNotification(), false);
} else {
- return SocketState.CLOSED;
+ recycledProcessors.offer(processor);
}
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
@@ -435,12 +481,8 @@
// less-than-verbose logs.
AjpAprProtocol.log.error
(sm.getString("ajpprotocol.proto.error"), e);
- } finally {
- if (processor instanceof ActionHook) {
- ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
- }
- recycledProcessors.offer(processor);
}
+ recycledProcessors.offer(processor);
return SocketState.CLOSED;
}
Modified: trunk/java/org/apache/coyote/ajp/AjpProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpProcessor.java 2009-03-17 17:02:07 UTC (rev 960)
+++ trunk/java/org/apache/coyote/ajp/AjpProcessor.java 2009-03-18 12:46:31 UTC (rev 961)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.ajp;
@@ -41,6 +46,8 @@
import org.apache.tomcat.util.http.HttpMessages;
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.JIoEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
@@ -157,12 +164,6 @@
/**
- * State flag.
- */
- protected boolean started = false;
-
-
- /**
* Error flag.
*/
protected boolean error = false;
@@ -270,6 +271,20 @@
protected static final byte[] flushMessageArray;
+ /**
+ * Comet used.
+ */
+ protected boolean comet = false;
+
+
+ /**
+ * Comet processing.
+ */
+ protected boolean cometProcessing = true;
+ public void startProcessing() { cometProcessing = true; }
+ public void endProcessing() { cometProcessing = false; }
+
+
// ----------------------------------------------------- Static Initializer
@@ -336,6 +351,21 @@
public void setKeepAliveTimeout(int timeout) { keepAliveTimeout = timeout; }
+ /**
+ * Timeout.
+ */
+ protected int timeout = -1;
+ public void setTimeout(int timeout) { this.timeout = timeout; }
+ public int getTimeout() { return timeout; }
+
+
+ /**
+ * A resume has been requested.
+ */
+ protected boolean resumeNotification = false;
+ public boolean getResumeNotification() { return resumeNotification; }
+
+
// --------------------------------------------------------- Public Methods
@@ -348,13 +378,51 @@
}
+ public SocketState event(SocketStatus status)
+ throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ if (status == SocketStatus.OPEN_CALLBACK) {
+ // The resume notification is now done
+ resumeNotification = false;
+ } else if (status == SocketStatus.ERROR) {
+ // Set error flag right away
+ error = true;
+ }
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.event(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (error) {
+ recycle();
+ return SocketState.CLOSED;
+ } else if (!comet) {
+ finish();
+ recycle();
+ return SocketState.OPEN;
+ } else {
+ return SocketState.LONG;
+ }
+ }
+
+
/**
* Process pipelined HTTP requests using the specified input and output
* streams.
*
* @throws IOException error during an I/O operation
*/
- public void process(Socket socket)
+ public SocketState process(Socket socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -371,7 +439,7 @@
// Error flag
error = false;
- while (started && !error) {
+ while (!error && !comet) {
// Parsing the request header
try {
@@ -445,7 +513,7 @@
}
// Finish the response if not done yet
- if (!finished) {
+ if (!comet && !finished) {
try {
finish();
} catch (Throwable t) {
@@ -460,16 +528,32 @@
}
request.updateCounters();
+ if (!comet) {
+ recycle();
+ }
+
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
- recycle();
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- recycle();
- input = null;
- output = null;
+ if (comet) {
+ if (error) {
+ input = null;
+ output = null;
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ cometProcessing = false;
+ return SocketState.LONG;
+ }
+ } else {
+ input = null;
+ output = null;
+ recycle();
+ return SocketState.CLOSED;
+ }
}
@@ -530,14 +614,6 @@
error = true;
}
- } else if (actionCode == ActionCode.ACTION_START) {
-
- started = true;
-
- } else if (actionCode == ActionCode.ACTION_STOP) {
-
- started = false;
-
} else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {
if (!certificates.isNull()) {
@@ -599,6 +675,21 @@
empty = false;
replay = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
+ comet = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_END) {
+ comet = false;
+ } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ // No action needed
+ } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ // An event is being processed already: adding for resume will be done
+ // when the socket gets back to the poller
+ if (!cometProcessing && !resumeNotification) {
+ endpoint.getPoller().add(socket, timeout, true, true);
+ }
+ resumeNotification = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ timeout = ((Integer) param).intValue();
}
@@ -1135,6 +1226,9 @@
empty = true;
replay = false;
finished = false;
+ timeout = -1;
+ resumeNotification = false;
+ cometProcessing = true;
request.recycle();
response.recycle();
certificates.recycle();
Modified: trunk/java/org/apache/coyote/ajp/AjpProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-17 17:02:07 UTC (rev 960)
+++ trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-18 12:46:31 UTC (rev 961)
@@ -27,6 +27,7 @@
import java.net.URLEncoder;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,8 +37,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.coyote.ActionCode;
-import org.apache.coyote.ActionHook;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.RequestGroupInfo;
@@ -340,6 +339,8 @@
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
+ protected ConcurrentHashMap<Socket, AjpProcessor> connections =
+ new ConcurrentHashMap<Socket, AjpProcessor>();
protected ConcurrentLinkedQueue<AjpProcessor> recycledProcessors =
new ConcurrentLinkedQueue<AjpProcessor>() {
protected AtomicInteger size = new AtomicInteger(0);
@@ -380,12 +381,57 @@
this.proto = proto;
}
- // FIXME: Support async
public SocketState event(Socket socket, SocketStatus status) {
- return SocketState.CLOSED;
+ AjpProcessor result = connections.get(socket);
+ SocketState state = SocketState.CLOSED;
+ if (result != null) {
+ result.startProcessing();
+ // Call the appropriate event
+ try {
+ state = result.event(status);
+ } catch (java.net.SocketException e) {
+ // SocketExceptions are normal
+ AjpProcessor.log.debug
+ (sm.getString
+ ("ajpprotocol.proto.socketexception.debug"), e);
+ } catch (java.io.IOException e) {
+ // IOExceptions are normal
+ AjpProcessor.log.debug
+ (sm.getString
+ ("ajpprotocol.proto.ioexception.debug"), e);
+ }
+ // Future developers: if you discover any other
+ // rare-but-nonfatal exceptions, catch them here, and log as
+ // above.
+ catch (Throwable e) {
+ // any other exception or error is odd. Here we log it
+ // with "ERROR" level, so it will show up even on
+ // less-than-verbose logs.
+ AjpProcessor.log.error
+ (sm.getString("ajpprotocol.proto.error"), e);
+ } finally {
+ if (state != SocketState.LONG) {
+ connections.remove(socket);
+ recycledProcessors.offer(result);
+ // FIXME: if the socket is still open, we should send it back to
reprocess it
+ // as if it was an initial request, or the simple solution is to
close after
+ // an async request; will see
+ if (proto.endpoint.isRunning() && state ==
SocketState.OPEN) {
+ //proto.endpoint.getPoller().add(socket);
+ }
+ } else {
+ if (proto.endpoint.isRunning()) {
+ proto.endpoint.getPoller().add(socket, result.getTimeout(),
+ result.getResumeNotification(), false);
+ }
+ }
+ result.endProcessing();
+ }
+ }
+ return state;
}
- public boolean process(Socket socket) {
+ public SocketState process(Socket socket) {
AjpProcessor processor = recycledProcessors.poll();
try {
@@ -393,13 +439,19 @@
processor = createProcessor();
}
- if (processor instanceof ActionHook) {
- ((ActionHook) processor).action(ActionCode.ACTION_START, null);
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Associate the connection with the processor. The next request
+ // processed by this thread will use either a new or a recycled
+ // processor.
+ connections.put(socket, processor);
+ proto.endpoint.getPoller().add(socket, processor.getTimeout(),
+ processor.getResumeNotification(), false);
+ } else {
+ recycledProcessors.offer(processor);
}
+ return state;
- processor.process(socket);
- return false;
-
} catch(java.net.SocketException e) {
// SocketExceptions are normal
AjpProtocol.log.debug
@@ -420,13 +472,9 @@
// less-than-verbose logs.
AjpProtocol.log.error
(sm.getString("ajpprotocol.proto.error"), e);
- } finally {
- if (processor instanceof ActionHook) {
- ((ActionHook) processor).action(ActionCode.ACTION_STOP, null);
- }
- recycledProcessors.offer(processor);
}
- return false;
+ recycledProcessors.offer(processor);
+ return SocketState.CLOSED;
}
protected AjpProcessor createProcessor() {
Modified: trunk/java/org/apache/coyote/http11/Http11AprProcessor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2009-03-17 17:02:07 UTC
(rev 960)
+++ trunk/java/org/apache/coyote/http11/Http11AprProcessor.java 2009-03-18 12:46:31 UTC
(rev 961)
@@ -259,12 +259,6 @@
/**
- * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
- */
- protected int timeout = 300000;
-
-
- /**
* Flag to disable setting a different time-out on uploads.
*/
protected boolean disableUploadTimeout = false;
@@ -324,7 +318,6 @@
protected String server = null;
- protected int cometTimeout = -1;
protected boolean readNotifications = true;
protected boolean writeNotification = false;
protected boolean resumeNotification = false;
@@ -359,11 +352,6 @@
}
- public int getCometTimeout() {
- return cometTimeout;
- }
-
-
public boolean getAvailable() {
return inputBuffer.available();
}
@@ -507,7 +495,14 @@
}
+ /**
+ * Timeout.
+ */
+ protected int timeout = -1;
+ public void setTimeout(int timeout) { this.timeout = timeout; }
+ public int getTimeout() { return timeout; }
+
// --------------------------------------------------------- Public Methods
@@ -714,21 +709,8 @@
return socketBuffer;
}
- /**
- * Set the upload timeout.
- */
- public void setTimeout(int timeout) {
- this.timeout = timeout;
- }
/**
- * Get the upload timeout.
- */
- public int getTimeout() {
- return timeout;
- }
-
- /**
* Set the server header name.
*/
public void setServer( String server ) {
@@ -804,6 +786,7 @@
recycle();
return SocketState.CLOSED;
} else if (!comet) {
+ endRequest();
boolean pipelined = inputBuffer.nextRequest();
outputBuffer.nextRequest();
recycle();
@@ -1008,7 +991,7 @@
inputBuffer.recycle();
outputBuffer.recycle();
this.socket = 0;
- cometTimeout = -1;
+ timeout = -1;
readNotifications = true;
writeNotification = false;
resumeNotification = false;
@@ -1274,18 +1257,18 @@
// An event is being processed already: adding for resume will be done
// when the socket gets back to the poller
if (!cometProcessing && !resumeNotification) {
- endpoint.getCometPoller().add(socket, cometTimeout, false, false, true,
true);
+ endpoint.getCometPoller().add(socket, timeout, false, false, true,
true);
}
resumeNotification = true;
} else if (actionCode == ActionCode.ACTION_COMET_WRITE) {
// An event is being processed already: adding for write will be done
// when the socket gets back to the poller
if (!cometProcessing && !writeNotification) {
- endpoint.getCometPoller().add(socket, cometTimeout, false, true, false,
true);
+ endpoint.getCometPoller().add(socket, timeout, false, true, false,
true);
}
writeNotification = true;
} else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
- cometTimeout = ((Integer) param).intValue();
+ timeout = ((Integer) param).intValue();
}
}
Modified: trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-17 17:02:07 UTC
(rev 960)
+++ trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-18 12:46:31 UTC
(rev 961)
@@ -36,7 +36,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.coyote.ActionCode;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.RequestGroupInfo;
@@ -580,7 +579,7 @@
}
} else {
if (proto.endpoint.isRunning()) {
- proto.endpoint.getCometPoller().add(socket,
result.getCometTimeout(),
+ proto.endpoint.getCometPoller().add(socket,
result.getTimeout(),
result.getReadNotifications(),
result.getWriteNotification(), result.getResumeNotification(), false);
}
}
@@ -597,8 +596,6 @@
processor = createProcessor();
}
- processor.action(ActionCode.ACTION_START, null);
-
SocketState state = processor.process(socket);
if (state == SocketState.LONG) {
// Associate the connection with the processor. The next request
@@ -609,8 +606,8 @@
// Call a read event right away
state = event(socket, SocketStatus.OPEN_READ);
} else {
- proto.endpoint.getCometPoller().add(socket,
processor.getCometTimeout(),
- processor.getReadNotifications(), false, false, false);
+ proto.endpoint.getCometPoller().add(socket,
processor.getTimeout(),
+ processor.getReadNotifications(), false,
processor.getResumeNotification(), false);
}
} else {
recycledProcessors.offer(processor);
Modified: trunk/java/org/apache/coyote/http11/Http11Processor.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11Processor.java 2009-03-17 17:02:07 UTC (rev
960)
+++ trunk/java/org/apache/coyote/http11/Http11Processor.java 2009-03-18 12:46:31 UTC (rev
961)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.http11;
@@ -48,6 +53,8 @@
import org.apache.tomcat.util.http.MimeHeaders;
import org.apache.tomcat.util.net.JIoEndpoint;
import org.apache.tomcat.util.net.SSLSupport;
+import org.apache.tomcat.util.net.SocketStatus;
+import org.apache.tomcat.util.net.JIoEndpoint.Handler.SocketState;
import org.apache.tomcat.util.res.StringManager;
@@ -134,12 +141,6 @@
/**
- * State flag.
- */
- protected boolean started = false;
-
-
- /**
* Error flag.
*/
protected boolean error = false;
@@ -244,12 +245,6 @@
/**
- * Maximum timeout on uploads. 5 minutes as in Apache HTTPD server.
- */
- protected int timeout = 300000;
-
-
- /**
* Flag to disable setting a different time-out on uploads.
*/
protected boolean disableUploadTimeout = false;
@@ -308,7 +303,27 @@
*/
protected String server = null;
+
+ /**
+ * Comet used.
+ */
+ protected boolean comet = false;
+
+ /**
+ * True if a resume has been requested.
+ */
+ protected boolean resumeNotification = false;
+
+
+ /**
+ * Comet processing.
+ */
+ protected boolean cometProcessing = true;
+ public void startProcessing() { cometProcessing = true; }
+ public void endProcessing() { cometProcessing = false; }
+
+
// ------------------------------------------------------------- Properties
@@ -450,7 +465,14 @@
}
+ /**
+ * Timeout.
+ */
+ protected int timeout = -1;
+ public void setTimeout(int timeout) { this.timeout = timeout; }
+ public int getTimeout() { return timeout; }
+
// --------------------------------------------------------- Public Methods
@@ -666,6 +688,11 @@
return disableUploadTimeout;
}
+ public boolean getResumeNotification() {
+ return resumeNotification;
+ }
+
+
/**
* Set the socket buffer flag.
*/
@@ -681,22 +708,8 @@
return socketBuffer;
}
- /**
- * Set the upload timeout.
- */
- public void setTimeout( int timeouts ) {
- timeout = timeouts ;
- }
/**
- * Get the upload timeout.
- */
- public int getTimeout() {
- return timeout;
- }
-
-
- /**
* Set the server header name.
*/
public void setServer( String server ) {
@@ -723,6 +736,47 @@
return request;
}
+ public SocketState event(SocketStatus status)
+ throws IOException {
+
+ RequestInfo rp = request.getRequestProcessor();
+ try {
+ if (status == SocketStatus.OPEN_CALLBACK) {
+ // The resume notification is now done
+ resumeNotification = false;
+ } else if (status == SocketStatus.ERROR) {
+ // Set error flag right away
+ error = true;
+ }
+ rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+ error = !adapter.event(request, response, status);
+ } catch (InterruptedIOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.process"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+
+ rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+ if (error) {
+ inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+ recycle();
+ return SocketState.CLOSED;
+ } else if (!comet) {
+ endRequest();
+ boolean pipelined = inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+ recycle();
+ return (pipelined) ? SocketState.CLOSED : SocketState.OPEN;
+ } else {
+ return SocketState.LONG;
+ }
+ }
+
/**
* Process pipelined HTTP requests using the specified input and output
* streams.
@@ -732,7 +786,7 @@
* responses
* @throws IOException error during an I/O operation
*/
- public void process(Socket socket)
+ public SocketState process(Socket socket)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
@@ -775,7 +829,7 @@
boolean keptAlive = false;
- while (started && !error && keepAlive) {
+ while (!error && keepAlive && !comet) {
// Parsing the request header
try {
@@ -852,26 +906,9 @@
// If there is an unspecified error, the connection will be closed
inputBuffer.setSwallowInput(false);
}
- try {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
- inputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
- } catch (Throwable t) {
- log.error(sm.getString("http11processor.request.finish"), t);
- // 500 - Internal Server Error
- response.setStatus(500);
- error = true;
+ if (!comet) {
+ endRequest();
}
- try {
- rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);
- outputBuffer.endRequest();
- } catch (IOException e) {
- error = true;
- } catch (Throwable t) {
- log.error(sm.getString("http11processor.response.finish"), t);
- error = true;
- }
// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
@@ -880,23 +917,69 @@
}
request.updateCounters();
+ if (!comet) {
+ // Next request
+ inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+ }
+
rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);
- // Next request
- inputBuffer.nextRequest();
- outputBuffer.nextRequest();
-
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
- // Recycle
+ if (comet) {
+ if (error) {
+ inputBuffer.nextRequest();
+ outputBuffer.nextRequest();
+ recycle();
+ return SocketState.CLOSED;
+ } else {
+ cometProcessing = false;
+ return SocketState.LONG;
+ }
+ } else {
+ recycle();
+ return SocketState.CLOSED;
+ }
+
+ }
+
+
+ public void endRequest() {
+
+ // Finish the handling of the request
+ try {
+ inputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.request.finish"), t);
+ // 500 - Internal Server Error
+ response.setStatus(500);
+ error = true;
+ }
+ try {
+ outputBuffer.endRequest();
+ } catch (IOException e) {
+ error = true;
+ } catch (Throwable t) {
+ log.error(sm.getString("http11processor.response.finish"), t);
+ error = true;
+ }
+
+ }
+
+
+ public void recycle() {
inputBuffer.recycle();
outputBuffer.recycle();
-
- // Recycle socket
this.socket = null;
sslSupport = null;
+ timeout = -1;
+ resumeNotification = false;
+ cometProcessing = true;
}
@@ -971,14 +1054,6 @@
// Do nothing
- } else if (actionCode == ActionCode.ACTION_START) {
-
- started = true;
-
- } else if (actionCode == ActionCode.ACTION_STOP) {
-
- started = false;
-
} else if (actionCode == ActionCode.ACTION_REQ_SSL_ATTRIBUTE ) {
try {
@@ -1090,6 +1165,21 @@
InternalInputBuffer internalBuffer = (InternalInputBuffer)
request.getInputBuffer();
internalBuffer.addActiveFilter(savedBody);
+ } else if (actionCode == ActionCode.ACTION_COMET_BEGIN) {
+ comet = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_END) {
+ comet = false;
+ } else if (actionCode == ActionCode.ACTION_COMET_SUSPEND) {
+ // No action needed
+ } else if (actionCode == ActionCode.ACTION_COMET_RESUME) {
+ // An event is being processed already: adding for resume will be done
+ // when the socket gets back to the poller
+ if (!cometProcessing && !resumeNotification) {
+ endpoint.getPoller().add(socket, timeout, true, true);
+ }
+ resumeNotification = true;
+ } else if (actionCode == ActionCode.ACTION_COMET_TIMEOUT) {
+ timeout = ((Integer) param).intValue();
}
}
Modified: trunk/java/org/apache/coyote/http11/Http11Protocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-17 17:02:07 UTC (rev
960)
+++ trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-18 12:46:31 UTC (rev
961)
@@ -27,6 +27,7 @@
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -36,7 +37,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.coyote.ActionCode;
import org.apache.coyote.Adapter;
import org.apache.coyote.ProtocolHandler;
import org.apache.coyote.RequestGroupInfo;
@@ -549,6 +549,8 @@
protected AtomicLong registerCount = new AtomicLong(0);
protected RequestGroupInfo global = new RequestGroupInfo();
+ protected ConcurrentHashMap<Socket, Http11Processor> connections =
+ new ConcurrentHashMap<Socket, Http11Processor>();
protected ConcurrentLinkedQueue<Http11Processor> recycledProcessors =
new ConcurrentLinkedQueue<Http11Processor>() {
protected AtomicInteger size = new AtomicInteger(0);
@@ -589,12 +591,57 @@
this.proto = proto;
}
- // FIXME: Support async
public SocketState event(Socket socket, SocketStatus status) {
- return SocketState.CLOSED;
+ Http11Processor result = connections.get(socket);
+ SocketState state = SocketState.CLOSED;
+ if (result != null) {
+ result.startProcessing();
+ // Call the appropriate event
+ try {
+ state = result.event(status);
+ } catch (java.net.SocketException e) {
+ // SocketExceptions are normal
+ Http11Protocol.log.debug
+ (sm.getString
+ ("http11protocol.proto.socketexception.debug"),
e);
+ } catch (java.io.IOException e) {
+ // IOExceptions are normal
+ Http11Protocol.log.debug
+ (sm.getString
+ ("http11protocol.proto.ioexception.debug"), e);
+ }
+ // Future developers: if you discover any other
+ // rare-but-nonfatal exceptions, catch them here, and log as
+ // above.
+ catch (Throwable e) {
+ // any other exception or error is odd. Here we log it
+ // with "ERROR" level, so it will show up even on
+ // less-than-verbose logs.
+ Http11Protocol.log.error
+ (sm.getString("http11protocol.proto.error"), e);
+ } finally {
+ if (state != SocketState.LONG) {
+ connections.remove(socket);
+ recycledProcessors.offer(result);
+ // FIXME: if the socket is still open, we should send it back to
reprocess it
+ // as if it was an initial request, or the simple solution is to
close after
+ // an async request; will see
+ if (proto.endpoint.isRunning() && state ==
SocketState.OPEN) {
+ //proto.endpoint.getPoller().add(socket);
+ }
+ } else {
+ if (proto.endpoint.isRunning()) {
+ proto.endpoint.getPoller().add(socket, result.getTimeout(),
+ result.getResumeNotification(), false);
+ }
+ }
+ result.endProcessing();
+ }
+ }
+ return state;
}
- public boolean process(Socket socket) {
+ public SocketState process(Socket socket) {
Http11Processor processor = recycledProcessors.poll();
try {
@@ -602,8 +649,6 @@
processor = createProcessor();
}
- processor.action(ActionCode.ACTION_START, null);
-
if (proto.secure && (proto.sslImplementation != null)) {
processor.setSSLSupport
(proto.sslImplementation.getSSLSupport(socket));
@@ -611,8 +656,18 @@
processor.setSSLSupport(null);
}
- processor.process(socket);
- return false;
+ SocketState state = processor.process(socket);
+ if (state == SocketState.LONG) {
+ // Associate the connection with the processor. The next request
+ // processed by this thread will use either a new or a recycled
+ // processor.
+ connections.put(socket, processor);
+ proto.endpoint.getPoller().add(socket, processor.getTimeout(),
+ processor.getResumeNotification(), false);
+ } else {
+ recycledProcessors.offer(processor);
+ }
+ return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
@@ -634,14 +689,9 @@
// less-than-verbose logs.
Http11Protocol.log.error
(sm.getString("http11protocol.proto.error"), e);
- } finally {
- // if(proto.adapter != null) proto.adapter.recycle();
- // processor.recycle();
-
- processor.action(ActionCode.ACTION_STOP, null);
- recycledProcessors.offer(processor);
}
- return false;
+ recycledProcessors.offer(processor);
+ return SocketState.CLOSED;
}
protected Http11Processor createProcessor() {
Modified: trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
===================================================================
--- trunk/java/org/apache/coyote/http11/InternalInputBuffer.java 2009-03-17 17:02:07 UTC
(rev 960)
+++ trunk/java/org/apache/coyote/http11/InternalInputBuffer.java 2009-03-18 12:46:31 UTC
(rev 961)
@@ -292,7 +292,7 @@
* consumed. This method only resets all the pointers so that we are ready
* to parse the next HTTP request.
*/
- public void nextRequest() {
+ public boolean nextRequest() {
// Recycle Request object
request.recycle();
@@ -321,6 +321,8 @@
parsingHeader = true;
swallowInput = true;
+ return (lastValid > 0);
+
}
Modified: trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-17 17:02:07 UTC (rev
960)
+++ trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-18 12:46:31 UTC (rev
961)
@@ -29,6 +29,7 @@
import java.net.Socket;
import java.util.concurrent.Executor;
+import org.apache.tomcat.util.net.AprEndpoint.SocketInfo;
import org.apache.tomcat.util.res.StringManager;
import org.jboss.logging.Logger;
@@ -268,6 +269,15 @@
public ServerSocketFactory getServerSocketFactory() { return serverSocketFactory; }
+ /**
+ * The socket poller.
+ */
+ protected Poller poller = null;
+ public Poller getPoller() {
+ return poller;
+ }
+
+
public boolean isRunning() {
return running;
}
@@ -297,7 +307,7 @@
public enum SocketState {
OPEN, CLOSED, LONG
}
- public boolean process(Socket socket);
+ public SocketState process(Socket socket);
public SocketState event(Socket socket, SocketStatus status);
}
@@ -361,26 +371,11 @@
* with very little actual use.
*/
public static class SocketInfo {
+ public static final int RESUME = 4;
+ public static final int WAKEUP = 8;
public Socket socket;
public int timeout;
- public boolean wakeup;
- public boolean wakeup() {
- return wakeup;
- }
- public static boolean merge(boolean flag1, boolean flag2) {
- return (flag1 || flag2);
- }
- /*public static final int READ = 1;
- public static final int WRITE = 2;
- public static final int RESUME = 4;
- public static final int WAKEUP = 8;*/
- //public int flags;
- /*public boolean read() {
- return (flags & READ) == READ;
- }
- public boolean write() {
- return (flags & WRITE) == WRITE;
- }
+ public int flags;
public boolean resume() {
return (flags & RESUME) == RESUME;
}
@@ -388,11 +383,9 @@
return (flags & WAKEUP) == WAKEUP;
}
public static int merge(int flag1, int flag2) {
- return ((flag1 & READ) | (flag2 & READ))
- | ((flag1 & WRITE) | (flag2 & WRITE))
- | ((flag1 & RESUME) | (flag2 & RESUME))
+ return ((flag1 & RESUME) | (flag2 & RESUME))
| ((flag1 & WAKEUP) & (flag2 & WAKEUP));
- }*/
+ }
}
@@ -465,7 +458,7 @@
protected Socket[] sockets;
protected int[] timeouts;
- protected boolean[] wakeups;
+ protected int[] flags;
protected SocketInfo info = new SocketInfo();
@@ -474,7 +467,7 @@
pos = 0;
sockets = new Socket[size];
timeouts = new int[size];
- wakeups = new boolean[size];
+ flags = new int[size];
}
public int size() {
@@ -487,7 +480,7 @@
} else {
info.socket = sockets[pos];
info.timeout = timeouts[pos];
- info.wakeup = wakeups[pos];
+ info.flags = flags[pos];
pos++;
return info;
}
@@ -498,19 +491,19 @@
pos = 0;
}
- public boolean add(Socket socket, int timeout, boolean wakeup) {
+ public boolean add(Socket socket, int timeout, int flag) {
if (size == sockets.length) {
return false;
} else {
for (int i = 0; i < size; i++) {
if (sockets[i] == socket) {
- wakeups[i] = SocketInfo.merge(wakeups[i], wakeup);
+ flags[i] = SocketInfo.merge(flags[i], flag);
return true;
}
}
sockets[size] = socket;
timeouts[size] = timeout;
- wakeups[size] = wakeup;
+ flags[size] = flag;
size++;
return true;
}
@@ -521,7 +514,7 @@
copy.pos = pos;
System.arraycopy(sockets, 0, copy.sockets, 0, size);
System.arraycopy(timeouts, 0, copy.timeouts, 0, size);
- System.arraycopy(wakeups, 0, copy.wakeups, 0, size);
+ System.arraycopy(flags, 0, copy.flags, 0, size);
}
}
@@ -545,7 +538,7 @@
public void run() {
// Process the request from this socket
- if (!setSocketOptions(socket) || !handler.process(socket)) {
+ if (!setSocketOptions(socket) || (handler.process(socket) ==
Handler.SocketState.CLOSED)) {
// Close socket
try { socket.close(); } catch (IOException e) { }
}
@@ -665,8 +658,14 @@
}
addList.clear();
// Close all sockets still in the poller
- // FIXME: close all waiting for timeout
-
+ long future = System.currentTimeMillis() + Integer.MAX_VALUE;
+ Socket socket = timeouts.check(future);
+ while (socket != null) {
+ if (!processSocket(socket, SocketStatus.TIMEOUT)) {
+ try { socket.close(); } catch (IOException e) { }
+ }
+ socket = timeouts.check(future);
+ }
connectionCount = 0;
}
@@ -678,16 +677,19 @@
*
* @param socket to add to the poller
*/
- public void add(Socket socket, int timeout, boolean wakeup) {
- /*int timeout = keepAliveTimeout;
+ public void add(Socket socket, int timeout, boolean resume, boolean wakeup) {
if (timeout < 0) {
+ timeout = keepAliveTimeout;
+ }
+ if (timeout < 0) {
timeout = soTimeout;
- }*/
+ }
boolean ok = false;
synchronized (this) {
// Add socket to the list. Newly added sockets will wait
// at most for pollTime before being polled
- if (addList.add(socket, timeout, wakeup)) {
+ if (addList.add(socket, timeout, (resume ? SocketInfo.RESUME : 0)
+ | (wakeup ? SocketInfo.WAKEUP : 0))) {
ok = true;
this.notify();
}
@@ -723,24 +725,6 @@
}
/**
- * Displays the list of sockets in the pollers.
- *
- public String toString() {
- StringBuffer buf = new StringBuffer();
- buf.append("Poller comet=[").append(comet).append("]");
- long[] res = new long[actualPollerSize * 2];
- for (int i = 0; i < pollers.length; i++) {
- int count = Poll.pollset(pollers[i], res);
- buf.append(" [ ");
- for (int j = 0; j < count; j++) {
- buf.append(desc[2*j+1]).append(" ");
- }
- buf.append("]");
- }
- return buf.toString();
- }*/
-
- /**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@@ -789,12 +773,22 @@
if (info.wakeup()) {
// Resume event if socket is present in the poller
if (timeouts.remove(info.socket)) {
+ if (info.resume()) {
+ if (!processSocket(info.socket,
SocketStatus.OPEN_CALLBACK)) {
+ try { info.socket.close(); } catch
(IOException e) { }
+ }
+ } else {
+ timeouts.add(info.socket,
System.currentTimeMillis() + info.timeout);
+ }
+ }
+ } else {
+ if (info.resume()) {
if (!processSocket(info.socket,
SocketStatus.OPEN_CALLBACK)) {
try { info.socket.close(); } catch (IOException
e) { }
}
+ } else {
+ timeouts.add(info.socket, System.currentTimeMillis()
+ info.timeout);
}
- } else {
- timeouts.add(info.socket, System.currentTimeMillis() +
info.timeout);
}
info = localAddList.get();
}
@@ -932,9 +926,10 @@
// Process the request from this socket
if ((status != null) && (handler.event(socket, status) ==
Handler.SocketState.CLOSED)) {
+ // FIXME: If handler.event returns Handler.SocketState.OPEN, then
likely should process without socketOptions
// Close socket
try { socket.close(); } catch (IOException e) { }
- } else if ((status == null) && (!setSocketOptions(socket) ||
!handler.process(socket))) {
+ } else if ((status == null) && (!setSocketOptions(socket) ||
(handler.process(socket) == Handler.SocketState.CLOSED))) {
// Close socket
try { socket.close(); } catch (IOException e) { }
}
@@ -1014,6 +1009,14 @@
workers = new WorkerStack(maxThreads);
}
+ // Start poller thread
+ poller = new Poller();
+ poller.init();
+ Thread pollerThread = new Thread(poller, getName() + "-Poller");
+ pollerThread.setPriority(threadPriority);
+ pollerThread.setDaemon(true);
+ pollerThread.start();
+
// Start acceptor threads
for (int i = 0; i < acceptorThreadCount; i++) {
Thread acceptorThread = new Thread(new Acceptor(), getName() +
"-Acceptor-" + i);
@@ -1041,6 +1044,8 @@
if (running) {
running = false;
unlockAccept();
+ poller.destroy();
+ poller = null;
}
}