Author: remy.maucherat(a)jboss.com
Date: 2009-03-17 13:02:07 -0400 (Tue, 17 Mar 2009)
New Revision: 960
Modified:
trunk/java/org/apache/coyote/ProtocolHandler.java
trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
trunk/java/org/apache/coyote/ajp/AjpProtocol.java
trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
trunk/java/org/apache/coyote/http11/Http11Protocol.java
trunk/java/org/apache/coyote/memory/MemoryProtocolHandler.java
trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
Log:
- Update so that all protocols have async capabilities, with one of them having additional
IO driven events.
Modified: trunk/java/org/apache/coyote/ProtocolHandler.java
===================================================================
--- trunk/java/org/apache/coyote/ProtocolHandler.java 2009-03-17 13:26:02 UTC (rev 959)
+++ trunk/java/org/apache/coyote/ProtocolHandler.java 2009-03-17 17:02:07 UTC (rev 960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote;
@@ -53,7 +58,10 @@
public Adapter getAdapter();
- /**
+ public boolean hasIoEvents();
+
+
+ /**
* Init the protocol.
*/
public void init()
Modified: trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-17 13:26:02 UTC (rev
959)
+++ trunk/java/org/apache/coyote/ajp/AjpAprProtocol.java 2009-03-17 17:02:07 UTC (rev
960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.ajp;
@@ -149,6 +154,11 @@
}
+ public boolean hasIoEvents() {
+ return false;
+ }
+
+
/** Start the protocol
*/
public void init() throws Exception {
@@ -382,7 +392,7 @@
this.proto = proto;
}
- // FIXME: Support for this could be added in AJP as well
+ // FIXME: Support async
public SocketState event(long socket, SocketStatus status) {
return SocketState.CLOSED;
}
Modified: trunk/java/org/apache/coyote/ajp/AjpProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-17 13:26:02 UTC (rev 959)
+++ trunk/java/org/apache/coyote/ajp/AjpProtocol.java 2009-03-17 17:02:07 UTC (rev 960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.ajp;
@@ -39,6 +44,7 @@
import org.apache.coyote.RequestInfo;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.JIoEndpoint;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.JIoEndpoint.Handler;
import org.apache.tomcat.util.res.StringManager;
@@ -149,6 +155,11 @@
}
+ public boolean hasIoEvents() {
+ return false;
+ }
+
+
/** Start the protocol
*/
public void init() throws Exception {
@@ -369,6 +380,11 @@
this.proto = proto;
}
+ // FIXME: Support async
+ public SocketState event(Socket socket, SocketStatus status) {
+ return SocketState.CLOSED;
+ }
+
public boolean process(Socket socket) {
AjpProcessor processor = recycledProcessors.poll();
try {
Modified: trunk/java/org/apache/coyote/http11/Http11AprProtocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-17 13:26:02 UTC
(rev 959)
+++ trunk/java/org/apache/coyote/http11/Http11AprProtocol.java 2009-03-17 17:02:07 UTC
(rev 960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.http11;
@@ -96,6 +101,11 @@
public Adapter getAdapter() { return adapter; }
+ public boolean hasIoEvents() {
+ return true;
+ }
+
+
/** Start the protocol
*/
public void init() throws Exception {
Modified: trunk/java/org/apache/coyote/http11/Http11Protocol.java
===================================================================
--- trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-17 13:26:02 UTC (rev
959)
+++ trunk/java/org/apache/coyote/http11/Http11Protocol.java 2009-03-17 17:02:07 UTC (rev
960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package org.apache.coyote.http11;
@@ -40,6 +45,7 @@
import org.apache.tomcat.util.net.JIoEndpoint;
import org.apache.tomcat.util.net.SSLImplementation;
import org.apache.tomcat.util.net.ServerSocketFactory;
+import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.JIoEndpoint.Handler;
import org.apache.tomcat.util.res.StringManager;
@@ -141,6 +147,11 @@
public Adapter getAdapter() { return adapter; }
+ public boolean hasIoEvents() {
+ return false;
+ }
+
+
public void init() throws Exception {
endpoint.setName(getName());
endpoint.setHandler(cHandler);
@@ -578,6 +589,11 @@
this.proto = proto;
}
+ // FIXME: Support async
+ public SocketState event(Socket socket, SocketStatus status) {
+ return SocketState.CLOSED;
+ }
+
public boolean process(Socket socket) {
Http11Processor processor = recycledProcessors.poll();
try {
Modified: trunk/java/org/apache/coyote/memory/MemoryProtocolHandler.java
===================================================================
--- trunk/java/org/apache/coyote/memory/MemoryProtocolHandler.java 2009-03-17 13:26:02 UTC
(rev 959)
+++ trunk/java/org/apache/coyote/memory/MemoryProtocolHandler.java 2009-03-17 17:02:07 UTC
(rev 960)
@@ -71,6 +71,11 @@
}
+ public boolean hasIoEvents() {
+ return false;
+ }
+
+
// ------------------------------------------------ ProtocolHandler Methods
Modified: trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2009-03-17 13:26:02 UTC (rev
959)
+++ trunk/java/org/apache/tomcat/util/net/AprEndpoint.java 2009-03-17 17:02:07 UTC (rev
960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package
org.apache.tomcat.util.net;
@@ -307,14 +312,6 @@
/**
- * Allow comet request handling.
- */
- protected boolean useComet = true;
- public void setUseComet(boolean useComet) { this.useComet = useComet; }
- public boolean getUseComet() { return useComet; }
-
-
- /**
* The socket poller.
*/
protected Poller poller = null;
@@ -342,18 +339,6 @@
/**
- * Dummy maxSpareThreads property.
- */
- public int getMaxSpareThreads() { return 0; }
-
-
- /**
- * Dummy minSpareThreads property.
- */
- public int getMinSpareThreads() { return 0; }
-
-
- /**
* The server address.
*/
protected long serverAddress = 0;
Modified: trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java
===================================================================
--- trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-17 13:26:02 UTC (rev
959)
+++ trunk/java/org/apache/tomcat/util/net/JIoEndpoint.java 2009-03-17 17:02:07 UTC (rev
960)
@@ -1,18 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * JBoss, Home of Professional Open Source
+ * Copyright 2009, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
*
- *
http://www.apache.org/licenses/LICENSE-2.0
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
*/
package
org.apache.tomcat.util.net;
@@ -26,7 +31,6 @@
import org.apache.tomcat.util.res.StringManager;
import org.jboss.logging.Logger;
-import org.jboss.logging.Logger;
/**
* Handle incoming TCP connections.
@@ -165,6 +169,22 @@
/**
+ * Size of the socket poller.
+ */
+ protected int pollerSize = 32 * 1024;
+ public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
+ public int getPollerSize() { return pollerSize; }
+
+
+ /**
+ * Keep-Alive timeout.
+ */
+ protected int keepAliveTimeout = -1;
+ public int getKeepAliveTimeout() { return keepAliveTimeout; }
+ public void setKeepAliveTimeout(int keepAliveTimeout) { this.keepAliveTimeout =
keepAliveTimeout; }
+
+
+ /**
* Server socket port.
*/
protected int port;
@@ -274,7 +294,11 @@
* thread local fields.
*/
public interface Handler {
+ public enum SocketState {
+ OPEN, CLOSED, LONG
+ }
public boolean process(Socket socket);
+ public SocketState event(Socket socket, SocketStatus status);
}
@@ -312,11 +336,7 @@
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
- try {
- socket.close();
- } catch (IOException e) {
- // Ignore
- }
+ try { socket.close(); } catch (IOException e) { }
}
}catch ( IOException x ) {
if ( running )
log.error(sm.getString("endpoint.accept.fail"), x);
@@ -333,6 +353,180 @@
}
+ // ------------------------------------------------- SocketInfo Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public static class SocketInfo {
+ public Socket socket;
+ public int timeout;
+ public boolean wakeup;
+ public boolean wakeup() {
+ return wakeup;
+ }
+ public static boolean merge(boolean flag1, boolean flag2) {
+ return (flag1 || flag2);
+ }
+ /*public static final int READ = 1;
+ public static final int WRITE = 2;
+ public static final int RESUME = 4;
+ public static final int WAKEUP = 8;*/
+ //public int flags;
+ /*public boolean read() {
+ return (flags & READ) == READ;
+ }
+ public boolean write() {
+ return (flags & WRITE) == WRITE;
+ }
+ public boolean resume() {
+ return (flags & RESUME) == RESUME;
+ }
+ public boolean wakeup() {
+ return (flags & WAKEUP) == WAKEUP;
+ }
+ public static int merge(int flag1, int flag2) {
+ return ((flag1 & READ) | (flag2 & READ))
+ | ((flag1 & WRITE) | (flag2 & WRITE))
+ | ((flag1 & RESUME) | (flag2 & RESUME))
+ | ((flag1 & WAKEUP) & (flag2 & WAKEUP));
+ }*/
+ }
+
+
+ // --------------------------------------------- SocketTimeouts Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public class SocketTimeouts {
+ protected int size;
+
+ protected Socket[] sockets;
+ protected long[] timeouts;
+ protected int pos = 0;
+
+ public SocketTimeouts(int size) {
+ this.size = 0;
+ sockets = new Socket[size];
+ timeouts = new long[size];
+ }
+
+ public void add(Socket socket, long timeout) {
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ size++;
+ }
+
+ public boolean remove(Socket socket) {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ sockets[i] = sockets[size - 1];
+ timeouts[i] = timeouts[size - 1];
+ size--;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public Socket check(long date) {
+ while (pos < size) {
+ if (date >= timeouts[pos]) {
+ Socket result = sockets[pos];
+ sockets[pos] = sockets[size - 1];
+ timeouts[pos] = timeouts[size - 1];
+ size--;
+ return result;
+ }
+ pos++;
+ }
+ pos = 0;
+ return null;
+ }
+
+ }
+
+
+ // ------------------------------------------------- SocketList Inner Class
+
+
+ /**
+ * Socket list class, used to avoid using a possibly large amount of objects
+ * with very little actual use.
+ */
+ public class SocketList {
+ protected int size;
+ protected int pos;
+
+ protected Socket[] sockets;
+ protected int[] timeouts;
+ protected boolean[] wakeups;
+
+ protected SocketInfo info = new SocketInfo();
+
+ public SocketList(int size) {
+ this.size = 0;
+ pos = 0;
+ sockets = new Socket[size];
+ timeouts = new int[size];
+ wakeups = new boolean[size];
+ }
+
+ public int size() {
+ return this.size;
+ }
+
+ public SocketInfo get() {
+ if (pos == size) {
+ return null;
+ } else {
+ info.socket = sockets[pos];
+ info.timeout = timeouts[pos];
+ info.wakeup = wakeups[pos];
+ pos++;
+ return info;
+ }
+ }
+
+ public void clear() {
+ size = 0;
+ pos = 0;
+ }
+
+ public boolean add(Socket socket, int timeout, boolean wakeup) {
+ if (size == sockets.length) {
+ return false;
+ } else {
+ for (int i = 0; i < size; i++) {
+ if (sockets[i] == socket) {
+ wakeups[i] = SocketInfo.merge(wakeups[i], wakeup);
+ return true;
+ }
+ }
+ sockets[size] = socket;
+ timeouts[size] = timeout;
+ wakeups[size] = wakeup;
+ size++;
+ return true;
+ }
+ }
+
+ public void duplicate(SocketList copy) {
+ copy.size = size;
+ copy.pos = pos;
+ System.arraycopy(sockets, 0, copy.sockets, 0, size);
+ System.arraycopy(timeouts, 0, copy.timeouts, 0, size);
+ System.arraycopy(wakeups, 0, copy.wakeups, 0, size);
+ }
+
+ }
+
+
// ------------------------------------------- SocketProcessor Inner Class
@@ -353,10 +547,7 @@
// Process the request from this socket
if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
- try {
- socket.close();
- } catch (IOException e) {
- }
+ try { socket.close(); } catch (IOException e) { }
}
// Finish up this request
@@ -367,6 +558,281 @@
}
+ // --------------------------------------- SocketEventProcessor Inner Class
+
+
+ /**
+ * This class is the equivalent of the Worker, but will simply use in an
+ * external Executor thread pool.
+ */
+ protected class SocketEventProcessor implements Runnable {
+
+ protected Socket socket = null;
+ protected SocketStatus status = null;
+
+ public SocketEventProcessor(Socket socket, SocketStatus status) {
+ this.socket = socket;
+ this.status = status;
+ }
+
+ public void run() {
+
+ // Process the request from this socket
+ if (handler.event(socket, status) == Handler.SocketState.CLOSED) {
+ // Close socket
+ try { socket.close(); } catch (IOException e) { }
+ }
+ socket = null;
+
+ }
+
+ }
+
+
+ // ----------------------------------------------------- Poller Inner Class
+
+
+ /**
+ * Poller class.
+ */
+ public class Poller implements Runnable {
+
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList addList = null;
+
+ /**
+ * List of sockets to be added to the poller.
+ */
+ protected SocketList localAddList = null;
+
+ /**
+ * Structure used for storing timeouts.
+ */
+ protected SocketTimeouts timeouts = null;
+
+
+ /**
+ * Last run of maintain. Maintain will run usually every 5s.
+ */
+ protected long lastMaintain = System.currentTimeMillis();
+
+
+ /**
+ * Amount of connections inside this poller.
+ */
+ protected int connectionCount = 0;
+ public int getConnectionCount() { return connectionCount; }
+
+ public Poller() {
+ }
+
+ /**
+ * Create the poller. The java.io poller only deals with timeouts.
+ */
+ protected void init() {
+
+ timeouts = new SocketTimeouts(pollerSize);
+
+ connectionCount = 0;
+ addList = new SocketList(pollerSize);
+ localAddList = new SocketList(pollerSize);
+
+ }
+
+ /**
+ * Destroy the poller.
+ */
+ protected void destroy() {
+ // Wait for pollerTime before doing anything, so that the poller threads
+ // exit, otherwise parallel destruction of sockets which are still
+ // in the poller can cause problems
+ try {
+ synchronized (this) {
+ this.wait(2);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ // Close all sockets in the add queue
+ SocketInfo info = addList.get();
+ while (info != null) {
+ if (!processSocket(info.socket, SocketStatus.STOP)) {
+ try { info.socket.close(); } catch (IOException e) { }
+ }
+ info = addList.get();
+ }
+ addList.clear();
+ // Close all sockets still in the poller
+ // FIXME: close all waiting for timeout
+
+ connectionCount = 0;
+ }
+
+ /**
+ * Add specified socket and associated pool to the poller. The socket will
+ * be added to a temporary array, and polled first after a maximum amount
+ * of time equal to pollTime (in most cases, latency will be much lower,
+ * however).
+ *
+ * @param socket to add to the poller
+ */
+ public void add(Socket socket, int timeout, boolean wakeup) {
+ /*int timeout = keepAliveTimeout;
+ if (timeout < 0) {
+ timeout = soTimeout;
+ }*/
+ boolean ok = false;
+ synchronized (this) {
+ // Add socket to the list. Newly added sockets will wait
+ // at most for pollTime before being polled
+ if (addList.add(socket, timeout, wakeup)) {
+ ok = true;
+ this.notify();
+ }
+ }
+ if (!ok) {
+ // Can't do anything: close the socket right away
+ if (!processSocket(socket, SocketStatus.ERROR)) {
+ try { socket.close(); } catch (IOException e) { }
+ }
+ }
+ }
+
+ /**
+ * Timeout checks.
+ */
+ protected void maintain() {
+
+ long date = System.currentTimeMillis();
+ // Maintain runs at most once every 5s, although it will likely get called
more
+ if ((date - lastMaintain) < 5000L) {
+ return;
+ } else {
+ lastMaintain = date;
+ }
+ Socket socket = timeouts.check(date);
+ while (socket != null) {
+ if (!processSocket(socket, SocketStatus.TIMEOUT)) {
+ try { socket.close(); } catch (IOException e) { }
+ }
+ socket = timeouts.check(date);
+ }
+
+ }
+
+ /**
+ * Displays the list of sockets in the pollers.
+ *
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Poller comet=[").append(comet).append("]");
+ long[] res = new long[actualPollerSize * 2];
+ for (int i = 0; i < pollers.length; i++) {
+ int count = Poll.pollset(pollers[i], res);
+ buf.append(" [ ");
+ for (int j = 0; j < count; j++) {
+ buf.append(desc[2*j+1]).append(" ");
+ }
+ buf.append("]");
+ }
+ return buf.toString();
+ }*/
+
+ /**
+ * The background thread that listens for incoming TCP/IP connections and
+ * hands them off to an appropriate processor.
+ */
+ public void run() {
+
+ int maintain = 0;
+ // Loop until we receive a shutdown command
+ while (running) {
+
+ // Loop if endpoint is paused
+ while (paused) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ // Check timeouts for suspended connections if the poller is empty
+ while (connectionCount < 1 && addList.size() < 1) {
+ // Reset maintain time.
+ try {
+ if (soTimeout > 0 && running) {
+ maintain();
+ }
+ synchronized (this) {
+ this.wait(10000);
+ }
+ } catch (InterruptedException e) {
+ // Ignore
+ } catch (Throwable t) {
+ log.error(sm.getString("endpoint.maintain.error"), t);
+ }
+ }
+
+ try {
+
+ // Add sockets which are waiting to the poller
+ if (addList.size() > 0) {
+ synchronized (this) {
+ // Duplicate to another list, so that the syncing is minimal
+ addList.duplicate(localAddList);
+ addList.clear();
+ }
+ SocketInfo info = localAddList.get();
+ while (info != null) {
+ if (info.wakeup()) {
+ // Resume event if socket is present in the poller
+ if (timeouts.remove(info.socket)) {
+ if (!processSocket(info.socket,
SocketStatus.OPEN_CALLBACK)) {
+ try { info.socket.close(); } catch (IOException
e) { }
+ }
+ }
+ } else {
+ timeouts.add(info.socket, System.currentTimeMillis() +
info.timeout);
+ }
+ info = localAddList.get();
+ }
+ }
+
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+
+ // Process socket timeouts
+ if (soTimeout > 0 && maintain++ > 1000 &&
running) {
+ // This works and uses only one timeout mechanism for everything,
but the
+ // non Comet poller might be a bit faster by using the old
maintain.
+ maintain = 0;
+ maintain();
+ }
+
+ } catch (Throwable t) {
+ if (maintain == 0) {
+ log.error(sm.getString("endpoint.maintain.error"), t);
+ } else {
+ log.error(sm.getString("endpoint.poll.error"), t);
+ }
+ }
+
+ }
+
+ synchronized (this) {
+ this.notifyAll();
+ }
+
+ }
+
+ }
+
+
// ----------------------------------------------------- Worker Inner Class
@@ -375,6 +841,7 @@
protected Thread thread = null;
protected boolean available = false;
protected Socket socket = null;
+ protected SocketStatus status = null;
/**
@@ -386,7 +853,7 @@
*
* @param socket TCP socket to process
*/
- synchronized void assign(Socket socket) {
+ protected synchronized void assign(Socket socket) {
// Wait for the Processor to get the previous Socket
while (available) {
@@ -398,12 +865,32 @@
// Store the newly available Socket and notify our thread
this.socket = socket;
+ this.status = null;
available = true;
notifyAll();
}
+ protected synchronized void assign(Socket socket, SocketStatus status) {
+
+ // Wait for the Processor to get the previous Socket
+ while (available) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ // Store the newly available Socket and notify our thread
+ this.socket = socket;
+ this.status = status;
+ available = true;
+ notifyAll();
+
+ }
+
+
/**
* Await a newly assigned Socket from our Connector, or
<code>null</code>
* if we are supposed to shut down.
@@ -444,16 +931,15 @@
continue;
// Process the request from this socket
- if (!setSocketOptions(socket) || !handler.process(socket)) {
+ if ((status != null) && (handler.event(socket, status) ==
Handler.SocketState.CLOSED)) {
// Close socket
- try {
- socket.close();
- } catch (IOException e) {
- }
+ try { socket.close(); } catch (IOException e) { }
+ } else if ((status == null) && (!setSocketOptions(socket) ||
!handler.process(socket))) {
+ // Close socket
+ try { socket.close(); } catch (IOException e) { }
}
// Finish up this request
- socket = null;
recycleWorkerThread(this);
}
@@ -748,6 +1234,26 @@
}
+ /**
+ * Process given socket for an event.
+ */
+ protected boolean processSocket(Socket socket, SocketStatus status) {
+ try {
+ if (executor == null) {
+ getWorkerThread().assign(socket, status);
+ } else {
+ executor.execute(new SocketEventProcessor(socket, status));
+ }
+ } catch (Throwable t) {
+ // This means we got an OOM or similar creating a thread, or that
+ // the pool and its queue are full
+ log.error(sm.getString("endpoint.process.fail"), t);
+ return false;
+ }
+ return true;
+ }
+
+
// ------------------------------------------------- WorkerStack Inner Class