Author: alessio.soldano(a)jboss.com
Date: 2009-09-10 07:57:10 -0400 (Thu, 10 Sep 2009)
New Revision: 10673
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/KeepAliveCache.java
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyHelper.java
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyTransportHandler.java
Removed:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSClientPipelineFactory.java
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyClient.java
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSResponseHandler.java
Log:
[JBWS-2753] Adding KeepAliveCache
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/KeepAliveCache.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/KeepAliveCache.java
(rev 0)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/KeepAliveCache.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -0,0 +1,388 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client.transport;
+
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Stack;
+
+import org.jboss.ws.Constants;
+
+/**
+ * The keep-alive cache used for keeping track of idle NettyTransport instances
+ * This is inspired from the KeepAliveCache used by HttpConnection.
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 08-Sep-2009
+ *
+ */
+public class KeepAliveCache implements Runnable
+{
+ private static final long serialVersionUID = 1L;
+
+ //the hashtable actually storing the objects
+ private Hashtable<KeepAliveKey, TransportHandlerVector> table = new
Hashtable<KeepAliveKey, TransportHandlerVector>();
+
+ //number of connections for a given url in the cache
+ private static final int DEFAULT_MAX_CONNECTIONS = 5;
+ private static int maxConnections = -1;
+ //approx lifetime (ms) of a NettyTransport before it is closed (unless a keep-alive
timeout is specified)
+ private static final int LIFETIME = 5000;
+
+ private Thread keepAliveTimer = null;
+
+ static int getMaxConnections()
+ {
+ if (maxConnections == -1)
+ {
+ maxConnections =
Integer.parseInt(getSystemProperty(Constants.HTTP_MAX_CONNECTIONS,
String.valueOf(DEFAULT_MAX_CONNECTIONS)));
+ if (maxConnections <= 0)
+ maxConnections = DEFAULT_MAX_CONNECTIONS;
+ }
+ return maxConnections;
+ }
+
+ public KeepAliveCache()
+ {
+
+ }
+
+ /**
+ * Register this URL and NettyTransportHandler with the cache
+ * @param url The URL contains info about the host and port
+ * @param transport The NettyTransportHandler to be cached
+ */
+ public synchronized void put(final URL url, NettyTransportHandler transport)
+ {
+ boolean startThread = (keepAliveTimer == null);
+ if (!startThread)
+ {
+ if (!keepAliveTimer.isAlive())
+ {
+ startThread = true;
+ }
+ }
+ if (startThread)
+ {
+ table.clear();
+ final KeepAliveCache cache = this;
+ AccessController.doPrivileged(new java.security.PrivilegedAction<Object>()
{
+ public Object run()
+ {
+ // We want to create the JBossWS-Keep-Alive-Timer in the
+ // system thread group
+ ThreadGroup grp = Thread.currentThread().getThreadGroup();
+ ThreadGroup parent = null;
+ while ((parent = grp.getParent()) != null)
+ {
+ grp = parent;
+ }
+
+ keepAliveTimer = new Thread(grp, cache,
"JBossWS-Keep-Alive-Timer");
+ keepAliveTimer.setDaemon(true);
+ keepAliveTimer.setPriority(Thread.MAX_PRIORITY - 2);
+ keepAliveTimer.start();
+ return null;
+ }
+ });
+ }
+
+ KeepAliveKey key = new KeepAliveKey(url);
+ TransportHandlerVector v = (TransportHandlerVector)table.get(key);
+
+ if (v == null)
+ {
+ int keepAliveTimeout = transport.getKeepAliveTimeout();
+ v = new TransportHandlerVector(keepAliveTimeout > 0 ? keepAliveTimeout * 1000
: LIFETIME);
+ v.put(transport);
+ table.put(key, v);
+ }
+ else
+ {
+ v.put(transport);
+ }
+ }
+
+ /* remove an obsolete NettyTransportHandler from its ClientVector */
+ public synchronized void remove(NettyTransportHandler h, Object obj)
+ {
+ KeepAliveKey key = new KeepAliveKey(h.getUrl());
+ TransportHandlerVector v = (TransportHandlerVector)table.get(key);
+ if (v != null)
+ {
+ v.remove(h);
+ if (v.empty())
+ {
+ removeVector(key);
+ }
+ }
+ }
+
+ /* called by a clientVector thread when all its connections have timed out
+ * and that vector of connections should be removed.
+ */
+ synchronized void removeVector(KeepAliveKey k)
+ {
+ table.remove(k);
+ }
+
+ /**
+ * Check to see if this URL has a cached NettyTransportHandler
+ */
+ public synchronized NettyTransportHandler get(URL url)
+ {
+
+ KeepAliveKey key = new KeepAliveKey(url);
+ TransportHandlerVector v = (TransportHandlerVector)table.get(key);
+ if (v == null)
+ { // nothing in cache yet
+ return null;
+ }
+ return v.get();
+ }
+
+ public void run()
+ {
+ do
+ {
+ try
+ {
+ Thread.sleep(LIFETIME);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ synchronized (this)
+ {
+ /* Remove all unused NettyTransportHandler. Starting from the
+ * bottom of the stack (the least-recently used first).
+ */
+ long currentTime = System.currentTimeMillis();
+
+ ArrayList<KeepAliveKey> keysToRemove = new
ArrayList<KeepAliveKey>();
+
+ for (KeepAliveKey key : table.keySet())
+ {
+ TransportHandlerVector v = (TransportHandlerVector)table.get(key);
+ synchronized (v)
+ {
+ int i;
+
+ for (i = 0; i < v.size(); i++)
+ {
+ KeepAliveEntry e = (KeepAliveEntry)v.elementAt(i);
+ if ((currentTime - e.idleStartTime) > v.nap)
+ {
+ NettyTransportHandler h = e.hc;
+ h.end();
+ }
+ else
+ {
+ break;
+ }
+ }
+ v.subList(0, i).clear();
+
+ if (v.size() == 0)
+ {
+ keysToRemove.add(key);
+ }
+ }
+ }
+ for (KeepAliveKey key : keysToRemove)
+ {
+ removeVector(key);
+ }
+ }
+ }
+ while (table.size() > 0);
+
+ return;
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream) throws IOException
+ {
+ throw new NotSerializableException();
+ }
+
+ private void readObject(java.io.ObjectInputStream stream) throws IOException,
ClassNotFoundException
+ {
+ throw new NotSerializableException();
+ }
+
+
+
+ /**
+ * A stack (FILO) for keeping NettyTransportHandler instances
+ */
+ class TransportHandlerVector extends Stack<KeepAliveEntry>
+ {
+ private static final long serialVersionUID = 1L;
+
+ // sleep time in milliseconds, before cache clear
+ int nap;
+
+ TransportHandlerVector(int nap)
+ {
+ this.nap = nap;
+ }
+
+ synchronized NettyTransportHandler get()
+ {
+ if (empty())
+ {
+ return null;
+ }
+ else
+ {
+ //Loop until we find a connection that has not timed out
+ NettyTransportHandler hc = null;
+ long currentTime = System.currentTimeMillis();
+ do
+ {
+ KeepAliveEntry e = (KeepAliveEntry)pop();
+ if ((currentTime - e.idleStartTime) > nap)
+ {
+ e.hc.end();
+ }
+ else
+ {
+ hc = e.hc;
+ }
+ }
+ while ((hc == null) && (!empty()));
+ return hc;
+ }
+ }
+
+ /**
+ * Return a valid NettyTransportHandler back to
+ * the stack.
+ *
+ * @param h
+ */
+ synchronized void put(NettyTransportHandler h)
+ {
+ if (size() > KeepAliveCache.getMaxConnections())
+ {
+ h.end();
+ }
+ else
+ {
+ push(new KeepAliveEntry(h, System.currentTimeMillis()));
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream stream) throws IOException
+ {
+ throw new NotSerializableException();
+ }
+
+ private void readObject(java.io.ObjectInputStream stream) throws IOException,
ClassNotFoundException
+ {
+ throw new NotSerializableException();
+ }
+ }
+
+ /**
+ * This is used for the keys of the cache, basically redefine equals
+ * and hashCode to use URL's protocol/host/port only (also preventing
+ * network traffic required by URL's equals/hashCode).
+ *
+ */
+ class KeepAliveKey
+ {
+ private String protocol = null;
+ private String host = null;
+ private int port = 0;
+
+ /**
+ * Constructor
+ *
+ * @param url the URL containing the protocol, host and port information
+ */
+ public KeepAliveKey(URL url)
+ {
+ this.protocol = url.getProtocol();
+ this.host = url.getHost();
+ this.port = url.getPort();
+ }
+
+ /**
+ * Determine whether or not two objects of this type are equal
+ */
+ public boolean equals(Object obj)
+ {
+ if ((obj instanceof KeepAliveKey) == false)
+ return false;
+ KeepAliveKey kae = (KeepAliveKey)obj;
+ return host.equals(kae.host) && (port == kae.port) &&
protocol.equals(kae.protocol);
+ }
+
+ /**
+ * The hashCode() for this object is the string hashCode() of
+ * concatenation of the protocol, host name and port.
+ */
+ public int hashCode()
+ {
+ String str = protocol + host + port;
+ return str.hashCode();
+ }
+ }
+
+ class KeepAliveEntry
+ {
+ NettyTransportHandler hc;
+ long idleStartTime;
+
+ KeepAliveEntry(NettyTransportHandler hc, long idleStartTime)
+ {
+ this.hc = hc;
+ this.idleStartTime = idleStartTime;
+ }
+ }
+
+ static String getSystemProperty(final String name, final String defaultValue)
+ {
+ SecurityManager sm = System.getSecurityManager();
+ if (sm == null)
+ {
+ return System.getProperty(name, defaultValue);
+ }
+ else
+ {
+ PrivilegedAction<String> action = new PrivilegedAction<String>() {
+ public String run()
+ {
+ return System.getProperty(name, defaultValue);
+ }
+ };
+ return AccessController.doPrivileged(action);
+ }
+ }
+
+}
Property changes on:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/KeepAliveCache.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyClient.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyClient.java 2009-09-10
11:54:16 UTC (rev 10672)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyClient.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -79,6 +79,7 @@
{
public static final String RESPONSE_CODE = "ResponseCode";
public static final String RESPONSE_CODE_MESSAGE = "ResponseCodeMessage";
+ public static final String PROTOCOL = "Protocol";
private static Logger log = Logger.getLogger(NettyClient.class);
private Marshaller marshaller;
@@ -87,9 +88,7 @@
private static final int DEFAULT_CHUNK_SIZE = 1024;
//We always use chunked transfer encoding unless explicitly disabled by user
private Integer chunkSize = new Integer(DEFAULT_CHUNK_SIZE);
- private Executor bossExecutor;
- private Executor workerExecutor;
-
+
/**
* Construct a Netty client with the provided marshaller/unmarshaller.
*
@@ -100,27 +99,9 @@
{
this.marshaller = marshaller;
this.unmarshaller = unmarshaller;
- this.bossExecutor = Executors.newCachedThreadPool();
- this.workerExecutor = Executors.newCachedThreadPool();
}
/**
- * Construct a Netty client with the provided marshaller/unmarshaller and executors.
- *
- * @param marshaller
- * @param unmarshaller
- * @param bossExecutor
- * @param workerExecutor
- */
- public NettyClient(Marshaller marshaller, UnMarshaller unmarshaller, Executor
bossExecutor, Executor workerExecutor)
- {
- this.marshaller = marshaller;
- this.unmarshaller = unmarshaller;
- this.bossExecutor = bossExecutor;
- this.workerExecutor = workerExecutor;
- }
-
- /**
* Performs the invocation; a HTTP GET is performed when the reqMessage is null,
otherwise a HTTP POST is performed.
*
* @param reqMessage The request message
@@ -145,44 +126,21 @@
throw new RuntimeException("Invalid address: " + targetAddress, e);
}
- ChannelFactory factory = new NioClientSocketChannelFactory(bossExecutor,
workerExecutor);
-
- ClientBootstrap bootstrap = new ClientBootstrap(factory);
- WSClientPipelineFactory channelPipelineFactory = new WSClientPipelineFactory();
- WSResponseHandler responseHandler = null;
- if (waitForResponse)
- {
- responseHandler = new WSResponseHandler(unmarshaller);
- channelPipelineFactory.setResponseHandler(responseHandler);
- }
- if ("https".equalsIgnoreCase(target.getProtocol()))
- {
- SSLContextFactory sslContextFactory = new SSLContextFactory(callProps);
- SSLEngine engine = sslContextFactory.getSSLContext().createSSLEngine();
- engine.setUseClientMode(true);
- channelPipelineFactory.setSslHandler(new SslHandler(engine));
- }
-
- bootstrap.setPipelineFactory(channelPipelineFactory);
-
+ NettyTransportHandler transport = NettyTransportHandler.getInstance(target,
NettyHelper.getChannelPipelineFactory(getSSLHandler(target, callProps)));
Channel channel = null;
+ Map<String, Object> resHeaders = null;
try
{
setActualTimeout(callProps);
- //Start the connection attempt
- ChannelFuture future = bootstrap.connect(getSocketAddress(target));
-
- //Wait until the connection attempt succeeds or fails
- awaitUninterruptibly(future, timeout);
- if (!future.isSuccess())
+ channel = transport.getChannel(timeout);
+
+ WSResponseHandler responseHandler = null;
+ if (waitForResponse)
{
- IOException io = new IOException("Could not connect to " +
target.getHost());
- io.initCause(future.getCause());
- factory.releaseExternalResources();
- throw io;
+ responseHandler = new WSResponseHandler(unmarshaller);
+ NettyHelper.setResponseHandler(channel, responseHandler);
}
- channel = future.getChannel();
-
+
//Send the HTTP request
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, reqMessage !=
null ? HttpMethod.POST : HttpMethod.GET, targetAddress);
request.addHeader(HttpHeaders.Names.HOST, target.getHost());
@@ -212,7 +170,6 @@
}
Object resMessage = null;
- Map<String, Object> resHeaders = null;
//Get the response
Future<Result> futureResult = responseHandler.getFutureResult();
Result result = timeout == null ? futureResult.get() : futureResult.get(timeout,
TimeUnit.MILLISECONDS);
@@ -253,30 +210,23 @@
{
if (channel != null)
{
- channel.close();
+ NettyHelper.clearResponseHandler(channel);
}
- //Shut down executor threads to exit
- factory.releaseExternalResources();
+ transport.finished(resHeaders);
}
}
-
- private InetSocketAddress getSocketAddress(URL target)
+
+ private static SslHandler getSSLHandler(URL target, Map<String, Object>
callProps) throws IOException
{
- int port = target.getPort();
- if (port < 0)
+ SslHandler handler = null;
+ if ("https".equalsIgnoreCase(target.getProtocol()))
{
- //use default port
- String protocol = target.getProtocol();
- if ("http".equalsIgnoreCase(protocol))
- {
- port = 80;
- }
- else if ("https".equalsIgnoreCase(protocol))
- {
- port = 443;
- }
+ SSLContextFactory sslContextFactory = new SSLContextFactory(callProps);
+ SSLEngine engine = sslContextFactory.getSSLContext().createSSLEngine();
+ engine.setUseClientMode(true);
+ handler = new SslHandler(engine);
}
- return new InetSocketAddress(target.getHost(), port);
+ return handler;
}
private ChannelFuture writeRequest(Channel channel, HttpRequest request, Object
reqMessage) throws IOException
@@ -314,29 +264,6 @@
}
/**
- * Utility method for awaiting with or without timeout (timeout == null or <=0
implies not timeout)
- *
- * @param future
- * @param timeout
- * @throws WSTimeoutException
- */
- private static void awaitUninterruptibly(ChannelFuture future, Long timeout) throws
WSTimeoutException
- {
- if (timeout != null && timeout.longValue() > 0)
- {
- boolean bool = future.awaitUninterruptibly(timeout);
- if (!bool)
- {
- throw new WSTimeoutException("Timeout after: " + timeout +
"ms", timeout);
- }
- }
- else
- {
- future.awaitUninterruptibly();
- }
- }
-
- /**
* Set the actual chunk size according to the endpoint config overwrite and/or
configured features.
*
* @param message
@@ -446,27 +373,7 @@
}
/**
- * Set the Netty boss executor
*
- * @param bossExecutor
- */
- public void setBossExecutor(Executor bossExecutor)
- {
- this.bossExecutor = bossExecutor;
- }
-
- /**
- * Set the Netty worker executor
- *
- * @param workerExecutor
- */
- public void setWorkerExecutor(Executor workerExecutor)
- {
- this.workerExecutor = workerExecutor;
- }
-
- /**
- *
* @return The current chunk size
*/
public Integer getChunkSize()
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyHelper.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyHelper.java
(rev 0)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyHelper.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -0,0 +1,147 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client.transport;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.ws.core.WSTimeoutException;
+
+/**
+ * Helper for dealing with Netty channels
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 08-Sep-2009
+ *
+ */
+public class NettyHelper
+{
+ public static final String RESPONSE_HANDLER_NAME = "handler";
+
+ public static ChannelPipelineFactory getChannelPipelineFactory()
+ {
+ return getChannelPipelineFactory(null);
+ }
+
+ public static ChannelPipelineFactory getChannelPipelineFactory(SslHandler sslHandler)
+ {
+ BasicPipelineFactory factory = new BasicPipelineFactory();
+ if (sslHandler != null)
+ {
+ factory.setSslHandler(sslHandler);
+ }
+ return factory;
+ }
+
+ public static void setResponseHandler(Channel channel, WSResponseHandler
responseHandler)
+ {
+ ChannelPipeline pipeline = channel.getPipeline();
+ pipeline.addLast(RESPONSE_HANDLER_NAME, responseHandler);
+ }
+
+ public static void clearResponseHandler(Channel channel)
+ {
+ ChannelPipeline pipeline = channel.getPipeline();
+ pipeline.remove(RESPONSE_HANDLER_NAME);
+ }
+
+ /**
+ * Utility method for awaiting with or without timeout (timeout == null or <=0
implies not timeout)
+ *
+ * @param future
+ * @param timeout
+ * @throws WSTimeoutException
+ */
+ public static void awaitUninterruptibly(ChannelFuture future, Long timeout) throws
WSTimeoutException
+ {
+ if (timeout != null && timeout.longValue() > 0)
+ {
+ boolean bool = future.awaitUninterruptibly(timeout);
+ if (!bool)
+ {
+ throw new WSTimeoutException("Timeout after: " + timeout +
"ms", timeout);
+ }
+ }
+ else
+ {
+ future.awaitUninterruptibly();
+ }
+ }
+
+ private static class BasicPipelineFactory implements ChannelPipelineFactory
+ {
+ private static final int MAX_CONTENT_SIZE = 1073741824;
+ private ChannelHandler sslHandler;
+
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ if (sslHandler != null)
+ {
+ pipeline.addLast("ssl", sslHandler);
+ }
+ pipeline.addLast("decoder", new HttpResponseDecoder());
+ // Uncomment the following line if you don't want to handle HttpChunks.
+ pipeline.addLast("aggregator", new
HttpChunkAggregator(MAX_CONTENT_SIZE));
+ pipeline.addLast("encoder", new HttpRequestEncoder());
+ return pipeline;
+ }
+
+ public ChannelHandler getSslHandler()
+ {
+ return sslHandler;
+ }
+
+ public void setSslHandler(ChannelHandler sslHandler)
+ {
+ this.sslHandler = sslHandler;
+ }
+ }
+
+ public static String getFirstHeaderAsString(Map<String, Object> headers, String
name)
+ {
+ Object obj = headers.get(name);
+ if (obj == null) return null;
+ if (obj instanceof Collection<?>)
+ {
+ Object value = ((Collection<?>)obj).iterator().next();
+ return (value != null) ? value.toString() : null;
+ }
+ else
+ {
+ return obj.toString();
+ }
+ }
+}
Property changes on:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyHelper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyTransportHandler.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyTransportHandler.java
(rev 0)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyTransportHandler.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -0,0 +1,329 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.security.AccessController;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.Executors;
+
+import org.jboss.logging.Logger;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.ws.Constants;
+
+/**
+ * This handles the Netty channels, allowing for a
+ * keep-alive system.
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 08-Sep-2009
+ *
+ */
+public class NettyTransportHandler
+{
+ private static Logger log = Logger.getLogger(NettyTransportHandler.class);
+ private static final int DEFAULT_KEEP_ALIVE_CONS = 5;
+
+ private URL url;
+ private ChannelFuture connectFuture;
+ private Channel channel;
+ private ChannelFactory factory;
+ //the idle cache
+ private static KeepAliveCache cache = new KeepAliveCache();
+ private static boolean keepAliveProp = true;
+
+ //whether this is currently in the idle cache
+ private boolean inCache;
+ //whether this is a keep-alive-connection
+ private volatile boolean keepingAlive = true;
+ //the number of keep-alive connections left
+ private int keepAliveConnections = DEFAULT_KEEP_ALIVE_CONS;
+ //the keep alive timeout in seconds
+ private int keepAliveTimeout;
+
+ static
+ {
+ String keepAlive = AccessController.doPrivileged(new
java.security.PrivilegedAction<String>() {
+ public String run()
+ {
+ return System.getProperty(Constants.HTTP_KEEP_ALIVE);
+ }
+ });
+ if (keepAlive != null)
+ {
+ keepAliveProp = Boolean.valueOf(keepAlive).booleanValue();
+ }
+ else
+ {
+ keepAliveProp = true;
+ }
+ }
+
+ private NettyTransportHandler(URL url, ChannelPipelineFactory pipelineFactory)
+ {
+ this.url = url;
+
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+ bootstrap.setPipelineFactory(pipelineFactory);
+
+ //Start the connection attempt
+ bootstrap.setOption("tcpNoDelay", true);
+ connectFuture = bootstrap.connect(getSocketAddress(url));
+ }
+
+ public static NettyTransportHandler getInstance(URL url, ChannelPipelineFactory
pipelineFactory)
+ {
+ return getInstance(url, true, pipelineFactory);
+ }
+
+ public static NettyTransportHandler getInstance(URL url, boolean useCache,
ChannelPipelineFactory pipelineFactory)
+ {
+ NettyTransportHandler ret = null;
+ if (useCache && keepAliveProp)
+ {
+ ret = cache.get(url);
+ if (ret != null)
+ {
+ synchronized (ret)
+ {
+ ret.inCache = false;
+ }
+ }
+ }
+ if (ret == null)
+ {
+ ret = new NettyTransportHandler(url, pipelineFactory);
+ }
+ else
+ {
+ SecurityManager security = System.getSecurityManager();
+ if (security != null)
+ {
+ security.checkConnect(url.getHost(), url.getPort());
+ }
+ ret.url = url;
+ }
+ return ret;
+ }
+
+ /**
+ * Get the channel for using the http connection
+ *
+ * @return A Netty channel
+ * @throws IOException
+ */
+ public Channel getChannel() throws IOException
+ {
+ return getChannel(null);
+ }
+
+ /**
+ * Get the channel for using the http connection
+ *
+ * @param timeout
+ * @return A Netty channel
+ * @throws IOException
+ */
+ public Channel getChannel(Long timeout) throws IOException
+ {
+ if (channel == null && connectFuture != null) //first call after connection
attempt
+ {
+ NettyHelper.awaitUninterruptibly(connectFuture, timeout);
+ if (!connectFuture.isSuccess())
+ {
+ IOException io = new IOException("Could not connect to " +
url.getHost());
+ io.initCause(connectFuture.getCause());
+ throw io;
+ }
+ channel = connectFuture.getChannel();
+ }
+ return channel;
+ }
+
+ /**
+ * Signal an invocation has been done using the current connection,
+ * but we're going on using it.
+ *
+ * @param headers The received message's headers
+ */
+ public void goOn(Map<String, Object> headers)
+ {
+ checkKeepAliveHeaders(headers);
+ }
+
+
+ /**
+ * Signal end of processing for the current connection. The transport
+ * handler either goes to the idle cache or is closed depending on
+ * the number of keep alive connections left (and this being a
+ * keep-alive connection or not, of course)
+ *
+ * @param headers The received message's headers
+ *
+ */
+ public void finished(Map<String, Object> headers)
+ {
+ checkKeepAliveHeaders(headers);
+ keepAliveConnections--;
+ if (keepAliveConnections > 0 && isKeepingAlive())
+ {
+ /* This connection is keepingAlive && still valid.
+ * Return it to the cache.
+ */
+ putInKeepAliveCache();
+ }
+ else
+ {
+ end();
+ }
+ }
+
+ /**
+ * Update the keep-alive status according to the received message's headers.
+ *
+ * @param headers The received message's headers
+ */
+ protected void checkKeepAliveHeaders(Map<String, Object> headers)
+ {
+ if (headers == null) return;
+ keepAliveConnections = -1;
+ keepAliveTimeout = 0;
+ try
+ {
+ String connectionHeader = NettyHelper.getFirstHeaderAsString(headers,
HttpHeaders.Names.CONNECTION);
+ if (connectionHeader != null &&
connectionHeader.equalsIgnoreCase(HttpHeaders.Values.KEEP_ALIVE))
+ {
+ //support for old servers (out of spec but quite used)
+ String keepAliveHeader = NettyHelper.getFirstHeaderAsString(headers,
"Keep-Alive");
+ if (keepAliveHeader != null)
+ {
+ StringTokenizer st = new StringTokenizer(keepAliveHeader, ", ",
false);
+ while (st.hasMoreTokens())
+ {
+ keepAliveTimeout = 5;
+ keepAliveConnections = DEFAULT_KEEP_ALIVE_CONS;
+ String s = st.nextToken();
+ if (s.startsWith("timeout="))
+ {
+ keepAliveTimeout = Integer.parseInt(s.substring(8));
+ }
+ if (s.startsWith("max="))
+ {
+ keepAliveConnections = Integer.parseInt(s.substring(4));
+ }
+ }
+ }
+ }
+ else if
(HttpVersion.HTTP_1_1.toString().equals(NettyHelper.getFirstHeaderAsString(headers,
NettyClient.PROTOCOL)))
+ {
+ //Consider the only valid value for Connection in responses is
"close"
+ keepAliveConnections = (connectionHeader == null) ? DEFAULT_KEEP_ALIVE_CONS :
1;
+ }
+
+ if (keepAliveConnections <=1)
+ keepingAlive = false;
+ }
+ catch (Exception ex)
+ {
+ log.error("Error while parsing headers for configuring keep-alive, closing
connection. ", ex);
+ keepAliveConnections = -1;
+ keepingAlive = false;
+ }
+ }
+
+ protected synchronized void putInKeepAliveCache()
+ {
+ if (inCache)
+ {
+ return;
+ }
+ inCache = true;
+ cache.put(url, this);
+ }
+
+ private InetSocketAddress getSocketAddress(URL target)
+ {
+ int port = target.getPort();
+ if (port < 0)
+ {
+ //use default port
+ String protocol = target.getProtocol();
+ if ("http".equalsIgnoreCase(protocol))
+ {
+ port = 80;
+ }
+ else if ("https".equalsIgnoreCase(protocol))
+ {
+ port = 443;
+ }
+ }
+ return new InetSocketAddress(target.getHost(), port);
+ }
+
+ /**
+ * End the transport handler, closing the underlying connection.
+ *
+ */
+ public void end()
+ {
+ keepingAlive = false;
+ channel.close();
+ factory.releaseExternalResources();
+ }
+
+ public boolean getHttpKeepAliveSet()
+ {
+ return keepAliveProp;
+ }
+
+ protected boolean isKeepingAlive()
+ {
+ return getHttpKeepAliveSet() && keepingAlive;
+ }
+
+ public int getKeepAliveTimeout()
+ {
+ return keepAliveTimeout;
+ }
+
+ public URL getUrl()
+ {
+ return url;
+ }
+
+ public void setKeepAliveConnections(int keepAliveConnections)
+ {
+ this.keepAliveConnections = keepAliveConnections;
+ }
+
+}
Property changes on:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/NettyTransportHandler.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSClientPipelineFactory.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSClientPipelineFactory.java 2009-09-10
11:54:16 UTC (rev 10672)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSClientPipelineFactory.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -1,86 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2006, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.ws.core.client.transport;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
-
-/**
- *
- * @author alessio.soldano(a)jboss.com
- * @since 24-Jun-2009
- *
- */
-public class WSClientPipelineFactory implements ChannelPipelineFactory
-{
- private static final int MAX_CONTENT_SIZE = 1073741824;
- private ChannelHandler responseHandler;
- private ChannelHandler sslHandler;
-
- public ChannelPipeline getPipeline() throws Exception
- {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- if (sslHandler != null)
- {
- pipeline.addLast("ssl", sslHandler);
- }
- pipeline.addLast("decoder", new HttpResponseDecoder());
- // Uncomment the following line if you don't want to handle HttpChunks.
- pipeline.addLast("aggregator", new
HttpChunkAggregator(MAX_CONTENT_SIZE));
- pipeline.addLast("encoder", new HttpRequestEncoder());
- if (responseHandler != null)
- {
- pipeline.addLast("handler", responseHandler);
- }
- return pipeline;
- }
-
- public ChannelHandler getResponseHandler()
- {
- return responseHandler;
- }
-
- public void setResponseHandler(ChannelHandler responseHandler)
- {
- this.responseHandler = responseHandler;
- }
-
- public ChannelHandler getSslHandler()
- {
- return sslHandler;
- }
-
- public void setSslHandler(ChannelHandler sslHandler)
- {
- this.sslHandler = sslHandler;
- }
-
-
-}
Modified:
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSResponseHandler.java
===================================================================
---
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSResponseHandler.java 2009-09-10
11:54:16 UTC (rev 10672)
+++
stack/native/trunk/modules/core/src/main/java/org/jboss/ws/core/client/transport/WSResponseHandler.java 2009-09-10
11:57:10 UTC (rev 10673)
@@ -76,6 +76,7 @@
HttpResponse response = (HttpResponse)e.getMessage();
Map<String, Object> responseHeaders = result.getResponseHeaders();
+ responseHeaders.put(NettyClient.PROTOCOL, response.getProtocolVersion());
responseHeaders.put(NettyClient.RESPONSE_CODE, response.getStatus().getCode());
responseHeaders.put(NettyClient.RESPONSE_CODE_MESSAGE,
response.getStatus().getReasonPhrase());
for (String headerName : response.getHeaderNames())