Author: jfrederic.clere(a)jboss.com
Date: 2009-10-02 09:46:00 -0400 (Fri, 02 Oct 2009)
New Revision: 1182
Added:
trunk/java/org/apache/catalina/ha/
trunk/java/org/apache/catalina/ha/backend/
trunk/java/org/apache/catalina/ha/backend/CollectedInfo.java
trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
trunk/java/org/apache/catalina/ha/backend/Proxy.java
trunk/java/org/apache/catalina/ha/backend/Sender.java
trunk/java/org/apache/catalina/ha/backend/TcpSender.java
Modified:
trunk/build.xml
Log:
Port the HeartbeatListener from Tomcat trunk.
Modified: trunk/build.xml
===================================================================
--- trunk/build.xml 2009-09-30 10:30:15 UTC (rev 1181)
+++ trunk/build.xml 2009-10-02 13:46:00 UTC (rev 1182)
@@ -221,7 +221,6 @@
<!-- Modules -->
<exclude name="org/apache/catalina/ant/**" />
<exclude name="org/apache/catalina/cluster/**" />
- <exclude name="org/apache/catalina/ha/**" />
<exclude name="org/apache/catalina/tribes/**" />
<exclude name="org/apache/catalina/launcher/**" />
<exclude name="org/apache/catalina/storeconfig/**" />
@@ -584,6 +583,7 @@
<fileset dir="${tomcat.classes}">
<!-- Temp EE class -->
<include name="org/apache/catalina/**" />
+ <include name="org/apache/catalina/ha/backend/**" />
<exclude name="org/apache/catalina/startup/catalina.properties"
/>
<include name="org/apache/naming/**" />
<include name="org/apache/comet/**" />
Added: trunk/java/org/apache/catalina/ha/backend/CollectedInfo.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/CollectedInfo.java
(rev 0)
+++ trunk/java/org/apache/catalina/ha/backend/CollectedInfo.java 2009-10-02 13:46:00 UTC
(rev 1182)
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+/* for MBean to read ready and busy */
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.ObjectInstance;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.tomcat.util.modeler.Registry;
+
+/*
+ * Listener to provider informations to mod_heartbeat.c
+ * *msg_format = "v=%u&ready=%u&busy=%u"; (message to send).
+ * send the muticast merssage using the format...
+ * what about the bind(IP. port) only IP makes sense (for the moment).
+ * BTW:v = version :-)
+ */
+public class CollectedInfo {
+
+ /* Collect info via JMX */
+ protected MBeanServer mBeanServer = null;
+ protected ObjectName objName = null;
+
+ int ready;
+ int busy;
+
+ public CollectedInfo(String host, int port) throws Exception {
+ init(host, port);
+ }
+ public void init(String host, int port) throws Exception {
+ String sport = Integer.toString(port);
+ mBeanServer = Registry.getRegistry(null, null).getMBeanServer();
+ String onStr = "*:type=ThreadPool,*";
+ ObjectName objectName = new ObjectName(onStr);
+ Set set = mBeanServer.queryMBeans(objectName, null);
+ Iterator<ObjectInstance> iterator = set.iterator();
+ while (iterator.hasNext()) {
+ ObjectInstance oi = iterator.next();
+ objName = oi.getObjectName();
+ String name = objName.getKeyProperty("name");
+ /* Name are:
+ * http-8080
+ * jk-10.33.144.3-8009
+ * jk-jfcpc%2F10.33.144.3-8009
+ */
+ if (port==0 && host==null)
+ break; /* Take the first one */
+ String [] elenames = name.split("-");
+ if (elenames[elenames.length-1].compareTo(sport) != 0)
+ continue; /* port doesn't match */
+ if (host==null)
+ break; /* Only port done */
+ String [] shosts = elenames[1].split("%2F");
+ if (shosts[0].compareTo(host) == 0)
+ break; /* Done port and host are the expected ones */
+ }
+ if (objName == null)
+ throw(new Exception("Can't find connector for " + host +
":" + sport));
+
+ }
+
+ public void refresh() throws Exception {
+ if (mBeanServer == null || objName == null) {
+ throw(new Exception("Not initialized!!!"));
+ }
+ Integer imax = (Integer) mBeanServer.getAttribute(objName,
"maxThreads");
+
+ // the currentThreadCount could be 0 before the threads are created...
+ // Integer iready = (Integer) mBeanServer.getAttribute(objName,
"currentThreadCount");
+
+ Integer ibusy = (Integer) mBeanServer.getAttribute(objName,
"currentThreadsBusy");
+
+ busy = ibusy.intValue();
+ ready = imax.intValue() - ibusy;
+ }
+}
Added: trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java
(rev 0)
+++ trunk/java/org/apache/catalina/ha/backend/HeartbeatListener.java 2009-10-02 13:46:00
UTC (rev 1182)
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import org.apache.catalina.ContainerEvent;
+import org.apache.catalina.ContainerListener;
+import org.apache.catalina.Lifecycle;
+import org.apache.catalina.LifecycleEvent;
+import org.apache.catalina.LifecycleListener;
+
+import org.jboss.logging.Logger;
+
+/*
+ * Listener to provider informations to mod_heartbeat.c
+ * *msg_format = "v=%u&ready=%u&busy=%u"; (message to send).
+ * send the muticast merssage using the format...
+ * what about the bind(IP. port) only IP makes sense (for the moment).
+ * BTW:v = version :-)
+ */
+public class HeartbeatListener
+ implements LifecycleListener, ContainerListener {
+
+ private static Logger log = Logger.getLogger(HeartbeatListener.class);
+
+ /* To allow to select the connector */
+ int port = 0;
+ String host = null;
+ public void setHost(String host) { this.host = host; }
+ public void setPort(int port) { this.port = port; }
+
+ /* for multicasting stuff */
+ String ip = "224.0.1.105"; /* Multicast IP */
+ int multiport = 23364; /* Multicast Port */
+ int ttl = 16;
+
+ public void setGroup(String ip) { this.ip = ip; }
+ public String getGroup() { return ip; }
+ public void setMultiport(int multiport) { this.multiport = multiport; }
+ public int getMultiport() { return multiport; }
+ public void setTtl(int ttl) { this.ttl = ttl; }
+ public int getTtl() { return ttl; }
+
+ /**
+ * Proxy list, format "address:port,address:port".
+ */
+ protected String proxyList = null;
+ public String getProxyList() { return proxyList; }
+ public void setProxyList(String proxyList) { this.proxyList = proxyList; }
+
+ /**
+ * URL prefix.
+ */
+ protected String proxyURL = "/HeartbeatListener";
+ public String getProxyURL() { return proxyURL; }
+ public void setProxyURL(String proxyURL) { this.proxyURL = proxyURL; }
+
+ private CollectedInfo coll = null;
+
+ private Sender sender = null;
+
+ public void containerEvent(ContainerEvent event) {
+ }
+
+ public void lifecycleEvent(LifecycleEvent event) {
+
+ if (Lifecycle.PERIODIC_EVENT.equals(event.getType())) {
+ if (sender == null) {
+ if (proxyList == null)
+ sender = new MultiCastSender();
+ else
+ sender = new TcpSender();
+
+ try {
+ sender.init(this);
+ } catch (Exception ex) {
+ log.error("Unable to initialize Sender: " + ex);
+ sender = null;
+ return;
+ }
+ }
+
+ /* Read busy and ready */
+ if (coll == null) {
+ try {
+ coll = new CollectedInfo(host, port);
+ } catch (Exception ex) {
+ log.error("Unable to initialize info collection: " + ex);
+ coll = null;
+ return;
+ }
+ }
+ try {
+ coll.refresh();
+ } catch (Exception ex) {
+ log.error("Unable to collect load information: " + ex);
+ coll = null;
+ return;
+ }
+ String output = new String();
+ output = "v=1&ready=" + coll.ready + "&busy=" +
coll.busy + "&port=" + port;
+ try {
+ sender.send(output);
+ } catch (Exception ex) {
+ log.error("Unable to send colllected load information: " +
ex);
+ }
+ }
+ }
+
+}
Added: trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java
(rev 0)
+++ trunk/java/org/apache/catalina/ha/backend/MultiCastSender.java 2009-10-02 13:46:00 UTC
(rev 1182)
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import org.jboss.logging.Logger;
+
+import java.net.MulticastSocket;
+import java.net.InetAddress;
+import java.net.DatagramPacket;
+import java.io.UnsupportedEncodingException;
+
+/*
+ * Sender to proxies using multicast socket.
+ */
+public class MultiCastSender
+ implements Sender {
+
+ private static Logger log = Logger.getLogger(HeartbeatListener.class);
+
+ HeartbeatListener config = null;
+
+ /* for multicasting stuff */
+ MulticastSocket s = null;
+ InetAddress group = null;
+
+ public void init(HeartbeatListener config) throws Exception {
+ this.config = config;
+ }
+
+ public int send(String mess) throws Exception {
+ if (s == null) {
+ try {
+ group = InetAddress.getByName(config.getGroup());
+ s = new MulticastSocket(config.getMultiport());
+ s.setTimeToLive(config.getTtl());
+ s.joinGroup(group);
+ } catch (Exception ex) {
+ log.error("Unable to use multicast: " + ex);
+ s = null;
+ return -1;
+ }
+ }
+
+ byte[] buf;
+ try {
+ buf = mess.getBytes("US-ASCII");
+ } catch (UnsupportedEncodingException ex) {
+ buf = mess.getBytes();
+ }
+ DatagramPacket data = new DatagramPacket(buf, buf.length, group,
config.getMultiport());
+ try {
+ s.send(data);
+ } catch (Exception ex) {
+ log.error("Unable to send colllected load information: " + ex);
+ s.close();
+ s = null;
+ return -1;
+ }
+ return 0;
+ }
+
+}
Added: trunk/java/org/apache/catalina/ha/backend/Proxy.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/Proxy.java (rev 0)
+++ trunk/java/org/apache/catalina/ha/backend/Proxy.java 2009-10-02 13:46:00 UTC (rev
1182)
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import java.net.InetAddress;
+
+/*
+ * This class represents a front-end httpd server.
+ *
+ */
+public class Proxy {
+
+ protected enum State { OK, ERROR, DOWN };
+
+ public InetAddress address = null;
+ public int port = 80;
+ public State state = State.OK;
+}
Added: trunk/java/org/apache/catalina/ha/backend/Sender.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/Sender.java (rev 0)
+++ trunk/java/org/apache/catalina/ha/backend/Sender.java 2009-10-02 13:46:00 UTC (rev
1182)
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+/*
+ * Interface to send data to proxies
+ *
+ */
+public interface Sender {
+
+ /**
+ * Set the configuration parameters
+ */
+ public void init(HeartbeatListener config) throws Exception;
+
+ /**
+ * Send the message to the proxies
+ */
+ public int send(String mess) throws Exception;
+}
Added: trunk/java/org/apache/catalina/ha/backend/TcpSender.java
===================================================================
--- trunk/java/org/apache/catalina/ha/backend/TcpSender.java (rev
0)
+++ trunk/java/org/apache/catalina/ha/backend/TcpSender.java 2009-10-02 13:46:00 UTC (rev
1182)
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.catalina.ha.backend;
+
+import org.jboss.logging.Logger;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.StringTokenizer;
+
+/*
+ * Sender to proxies using multicast socket.
+ */
+public class TcpSender
+ implements Sender {
+
+ private static Logger log = Logger.getLogger(HeartbeatListener.class);
+
+ HeartbeatListener config = null;
+
+ /**
+ * Proxies.
+ */
+ protected Proxy[] proxies = null;
+
+
+ /**
+ * Active connections.
+ */
+
+ protected Socket[] connections = null;
+ protected BufferedReader[] connectionReaders = null;
+ protected BufferedWriter[] connectionWriters = null;
+
+
+ public void init(HeartbeatListener config) throws Exception {
+ this.config = config;
+ StringTokenizer tok = new StringTokenizer(config.getProxyList(), ",");
+ proxies = new Proxy[tok.countTokens()];
+ int i = 0;
+ while (tok.hasMoreTokens()) {
+ String token = tok.nextToken().trim();
+ int pos = token.indexOf(':');
+ if (pos <=0)
+ throw new Exception("bad ProxyList");
+ proxies[i] = new Proxy();
+ proxies[i].port = Integer.parseInt(token.substring(pos + 1));
+ try {
+ proxies[i].address = InetAddress.getByName(token.substring(0, pos));
+ } catch (Exception e) {
+ throw new Exception("bad ProxyList");
+ }
+ i++;
+ }
+ connections = new Socket[proxies.length];
+ connectionReaders = new BufferedReader[proxies.length];
+ connectionWriters = new BufferedWriter[proxies.length];
+
+ }
+
+ public int send(String mess) throws Exception {
+ if (connections == null) {
+ log.error("Not initialized");
+ return -1;
+ }
+ String requestLine = "POST " + config.getProxyURL() + "
HTTP/1.0";
+
+ for (int i = 0; i < connections.length; i++) {
+ if (connections[i] == null) {
+ try {
+ connections[i] = new Socket(proxies[i].address, proxies[i].port);
+ connectionReaders[i] = new BufferedReader(new
InputStreamReader(connections[i].getInputStream()));
+ connectionWriters[i] = new BufferedWriter(new
OutputStreamWriter(connections[i].getOutputStream()));
+ } catch (Exception ex) {
+ log.error("Unable to connect to proxy: " + ex);
+ close(i);
+ }
+ }
+ if (connections[i] == null)
+ continue; // try next proxy in the list
+ BufferedWriter writer = connectionWriters[i];
+ try {
+ writer.write(requestLine);
+ writer.write("\r\n");
+ writer.write("Content-Length: " + mess.length() +
"\r\n");
+ writer.write("User-Agent: HeartbeatListener/1.0\r\n");
+ writer.write("Connection: Keep-Alive\r\n");
+ writer.write("\r\n");
+ writer.write(mess);
+ writer.write("\r\n");
+ writer.flush();
+ } catch (Exception ex) {
+ log.error("Unable to send collected load information to proxy:
" + ex);
+ close(i);
+ }
+ if (connections[i] == null)
+ continue; // try next proxy in the list
+
+ /* Read httpd answer */
+ String responseStatus = connectionReaders[i].readLine();
+ if (responseStatus == null) {
+ log.error("Unable to read response from proxy");
+ close(i);
+ continue;
+ } else {
+ responseStatus = responseStatus.substring(responseStatus.indexOf('
') + 1, responseStatus.indexOf(' ', responseStatus.indexOf(' ') +
1));
+ int status = Integer.parseInt(responseStatus);
+ if (status != 200) {
+ log.error("Status is " + status);
+ close(i);
+ continue;
+ }
+
+ // read all the headers.
+ String header = connectionReaders[i].readLine();
+ int contentLength = 0;
+ while (!"".equals(header)) {
+ int colon = header.indexOf(':');
+ String headerName = header.substring(0, colon).trim();
+ String headerValue = header.substring(colon + 1).trim();
+ if ("content-length".equalsIgnoreCase(headerName)) {
+ contentLength = Integer.parseInt(headerValue);
+ }
+ header = connectionReaders[i].readLine();
+ }
+ if (contentLength > 0) {
+ char[] buf = new char[512];
+ while (contentLength > 0) {
+ int thisTime = (contentLength > buf.length) ? buf.length :
contentLength;
+ int n = connectionReaders[i].read(buf, 0, thisTime);
+ if (n <= 0) {
+ log.error("Read content failed");
+ close(i);
+ break;
+ } else {
+ contentLength -= n;
+ }
+ }
+ }
+ }
+
+ }
+
+ return 0;
+ }
+
+ /**
+ * Close connection.
+ */
+ protected void close(int i) {
+ try {
+ if (connectionReaders[i] != null) {
+ connectionReaders[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connectionReaders[i] = null;
+ try {
+ if (connectionWriters[i] != null) {
+ connectionWriters[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connectionWriters[i] = null;
+ try {
+ if (connections[i] != null) {
+ connections[i].close();
+ }
+ } catch (IOException e) {
+ }
+ connections[i] = null;
+ }
+}