[infinispan-commits] Infinispan SVN: r1695 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 15 19:22:49 EDT 2010
Author: mircea.markus
Date: 2010-04-15 19:22:48 -0400 (Thu, 15 Apr 2010)
New Revision: 1695
Added:
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
Removed:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
trunk/client/hotrod-client/src/test/resources/log4j.xml
Log:
better object pooling
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -17,8 +17,12 @@
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.HashSet;
import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
/**
* // TODO: Document this
@@ -30,10 +34,16 @@
private static Log log = LogFactory.getLog(RemoteCacheManager.class);
+ public static final String HOTROD_CLIENT_PROPERTIES = "hotrod-client.properties";
+
+ public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
+
+ public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
+
+
private Properties props;
private TransportFactory transportFactory;
private String hotrodMarshaller;
- private static final String HOTROD_CLIENT_PROPERTIES = "hotrod-client.properties";
private boolean started = false;
@@ -139,7 +149,7 @@
log.info("'transport-factory' factory not specified, using " + factory);
}
transportFactory = (TransportFactory) VHelper.newInstance(factory);
- transportFactory.init(props);
+ transportFactory.start(props, getStaticConfiguredServers(props));
hotrodMarshaller = props.getProperty("marshaller");
if (hotrodMarshaller == null) {
hotrodMarshaller = SerializationMarshaller.class.getName();
@@ -175,4 +185,36 @@
HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
}
+
+ private Set<InetSocketAddress> getStaticConfiguredServers(Properties props) {
+ Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
+ String servers = props.getProperty(CONF_HOTROD_SERVERS);
+ if (servers == null) {
+ servers = System.getProperty(OVERRIDE_HOTROD_SERVERS);
+ if (servers != null) {
+ log.info("Overwriting default server properties (-D" + OVERRIDE_HOTROD_SERVERS + ") with " + servers);
+ } else {
+ servers = "127.0.0.1:11311";
+ }
+ log.info("'hotrod-servers' property not specified in config, using " + servers);
+ }
+ StringTokenizer tokenizer = new StringTokenizer(servers, ";");
+ while (tokenizer.hasMoreTokens()) {
+ String server = tokenizer.nextToken();
+ String[] serverDef = tokenizeServer(server);
+ String serverHost = serverDef[0];
+ int serverPort = Integer.parseInt(serverDef[1]);
+ serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
+ }
+ if (serverAddresses.isEmpty()) {
+ throw new IllegalStateException("No hot-rod servers specified!");
+ }
+ return serverAddresses;
+ }
+
+ private String[] tokenizeServer(String server) {
+ StringTokenizer t = new StringTokenizer(server, ":");
+ return new String[]{t.nextToken(), t.nextToken()};
+ }
+
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,6 +1,9 @@
package org.infinispan.client.hotrod.impl;
+import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Properties;
+import java.util.Set;
/**
* // TODO: Document this
@@ -12,15 +15,13 @@
public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
- public static final String CONF_TCP_CONNECTION_POOL = "tcp-connection-pool";
-
- public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
-
public Transport getTransport();
public void releaseTransport(Transport transport);
- void init(Properties props);
+ void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
+ void updateServers(Collection<InetSocketAddress> newServers);
+
void destroy();
}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,54 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport;
-
-import org.infinispan.client.hotrod.impl.TransportFactory;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public abstract class AbstractTransportFactory implements TransportFactory {
-
- private static Log log = LogFactory.getLog(AbstractTransportFactory.class);
-
- protected Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
-
- public void init(Properties props) {
- String servers = props.getProperty(CONF_HOTROD_SERVERS);
- if (servers == null) {
- servers = System.getProperty(OVERRIDE_HOTROD_SERVERS);
- if (servers != null) {
- log.info("Overwriting default server properties (-D" + OVERRIDE_HOTROD_SERVERS + ") with " + servers);
- } else {
- servers = "127.0.0.1:11311";
- }
- log.info("'hotrod-servers' property not specified in config, using " + servers);
- }
- StringTokenizer tokenizer = new StringTokenizer(servers, ";");
- while (tokenizer.hasMoreTokens()) {
- String server = tokenizer.nextToken();
- String[] serverDef = tokenizeServer(server);
- String serverHost = serverDef[0];
- int serverPort = Integer.parseInt(serverDef[1]);
- serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
- }
- if (serverAddresses.isEmpty()) {
- throw new IllegalStateException("No hot-rod servers specified!");
- }
- }
-
- private String[] tokenizeServer(String server) {
- StringTokenizer t = new StringTokenizer(server, ":");
- return new String[]{t.nextToken(), t.nextToken()};
- }
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,11 +1,12 @@
package org.infinispan.client.hotrod.impl.transport.netty;
import org.infinispan.client.hotrod.impl.Transport;
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.TransportFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Properties;
/**
@@ -14,19 +15,25 @@
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
-public class NettyTransportFactory extends AbstractTransportFactory {
+public class NettyTransportFactory implements TransportFactory {
private static Log log = LogFactory.getLog(NettyTransportFactory.class);
private InetSocketAddress serverAddr;
+ private Collection<InetSocketAddress> serverAddresses;
@Override
- public void init(Properties props) {
- super.init(props);
- serverAddr = super.serverAddresses.iterator().next();
+ public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+ this.serverAddresses = staticConfiguredServers;
+ serverAddr = serverAddresses.iterator().next();
}
@Override
+ public void updateServers(Collection<InetSocketAddress> newServers) {
+ throw new IllegalStateException();
+ }
+
+ @Override
public Transport getTransport() {
log.info("Connecting to server on: " + serverAddr);
return new NettyTransport(serverAddr);
@@ -34,7 +41,7 @@
@Override
public void destroy() {
- // TODO: Customise this generated block
+ //nothing to do here as this no pooling is available
}
@Override
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,107 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport.tcp;
-
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.infinispan.client.hotrod.impl.transport.TransportException;
-import org.infinispan.client.hotrod.impl.transport.VHelper;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Collection;
-import java.util.Properties;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- *
- * todo - all methods but init and start can be called from multiple threads, add proper sync
- */
-public class DefaultTcpConnectionPool implements TcpConnectionPool {
-
- private static Log log = LogFactory.getLog(DefaultTcpConnectionPool.class);
-
- private GenericKeyedObjectPool connectionPool;
- private PropsKeyedObjectPoolFactory poolFactory;
- private RequestBalancingStrategy balancer;
- private Collection<InetSocketAddress> servers;
-
- @Override
- public void init(Properties props) {
- String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
- balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
- poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
- }
-
- @Override
- public void start(Collection<InetSocketAddress> servers) {
- connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
- balancer.setServers(servers);
- this.servers = servers;
- }
-
- @Override
- public Socket getConnection() {
- InetSocketAddress server = balancer.nextServer();
- try {
- return (Socket) connectionPool.borrowObject(server);
- } catch (Exception e) {
- String message = "Could not fetch connection";
- log.error(message, e);
- throw new TransportException(message, e);
- }
- }
-
- @Override
- public void releaseConnection(Socket socket) {
- SocketAddress remoteAddress = socket.getRemoteSocketAddress();
- if (!servers.contains(remoteAddress)) throw new IllegalStateException(remoteAddress.toString());
- try {
- connectionPool.returnObject(remoteAddress, socket);
- } catch (Exception e) {
- log.warn("Could not release connection",e);
- }
- }
-
- @Override
- public void updateServers(Collection<InetSocketAddress> newServers) {
- if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
- log.info("Same list of servers, not changing the pool");
- return;
- }
- for (InetSocketAddress server : newServers) {
- if (!servers.contains(server)) {
- log.info("New server added(" + server + "), adding to the pool.");
- try {
- connectionPool.addObject(server);
- } catch (Exception e) {
- log.warn("Failed adding server " + server, e);
- }
- }
- }
- for (InetSocketAddress server : servers) {
- if (!newServers.contains(server)) {
- log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
- connectionPool.clear(server);
- }
- }
- servers.clear();
- servers.addAll(newServers);
- }
-
- @Override
- public void destroy() {
- connectionPool.clear();
- try {
- connectionPool.close();
- } catch (Exception e) {
- log.warn("Exception while shutting down the connection pool.", e);
- }
- }
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,15 +1,10 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.apache.commons.pool.BasePoolableObjectFactory;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
/**
* // TODO: Document this
@@ -24,23 +19,18 @@
@Override
public Object makeObject(Object key) throws Exception {
InetSocketAddress serverAddress = (InetSocketAddress) key;
+ TcpTransport tcpTransport = new TcpTransport(serverAddress);
if (log.isTraceEnabled()) {
- log.trace("Creating connection to server: " + serverAddress);
+ log.trace("Created tcp transport: " + tcpTransport);
}
- try {
- SocketChannel socketChannel = SocketChannel.open(serverAddress);
- return socketChannel.socket();
- } catch (IOException e) {
- log.warn("Could not create connection to " + serverAddress, e);
- throw e;
- }
+ return tcpTransport;
}
@Override
public boolean validateObject(Object key, Object obj) {
- Socket socket = (Socket) obj;
+ TcpTransport transport = (TcpTransport) obj;
if (log.isTraceEnabled()) {
- log.trace("About to validate(ping) connection to server " + key + ". socket is " + socket);
+ log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
}
//todo implement
return true;
@@ -48,14 +38,7 @@
@Override
public void destroyObject(Object key, Object obj) throws Exception {
- Socket socket = (Socket) obj;
- if (log.isTraceEnabled()) {
- log.trace("About to destroy socket " + socket);
- }
- try {
- socket.close();
- } catch (IOException e) {
- log.warn("Issues closing the socket: " + socket, e);
- }
+ TcpTransport transport = (TcpTransport) obj;
+ transport.destroy();
}
}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,28 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport.tcp;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.Properties;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public interface
- TcpConnectionPool {
-
- public void init(Properties props);
-
- public void start(Collection<InetSocketAddress> servers);
-
- public Socket getConnection();
-
- public void releaseConnection(Socket socket);
-
- public void updateServers(Collection<InetSocketAddress> newServers);
-
- public void destroy();
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -9,7 +9,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
/**
@@ -23,6 +22,7 @@
private static Log log = LogFactory.getLog(TcpTransport.class);
private Socket socket;
+ private InetSocketAddress serverAddress;
public void writeVInt(int vInt) {
try {
@@ -68,8 +68,16 @@
}
}
- public TcpTransport(Socket socket) {
- this.socket = socket;
+ public TcpTransport(InetSocketAddress serverAddress) {
+ this.serverAddress = serverAddress;
+ try {
+ SocketChannel socketChannel = SocketChannel.open(serverAddress);
+ socket = socketChannel.socket();
+ } catch (IOException e) {
+ String message = "Could not connect to server: " + serverAddress;
+ log.error(message, e);
+ throw new TransportException(message, e);
+ }
}
protected void writeBytes(byte[] toAppend) {
@@ -156,8 +164,47 @@
}
return result;
}
+
+ public InetSocketAddress getServerAddress() {
+ return serverAddress;
+ }
- public Socket getSocket() {
- return socket;
+ @Override
+ public String toString() {
+ return "TcpTransport{" +
+ "socket=" + socket +
+ ", serverAddress=" + serverAddress +
+ "} " + super.toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TcpTransport that = (TcpTransport) o;
+
+ if (serverAddress != null ? !serverAddress.equals(that.serverAddress) : that.serverAddress != null) return false;
+ if (socket != null ? !socket.equals(that.socket) : that.socket != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = socket != null ? socket.hashCode() : 0;
+ result = 31 * result + (serverAddress != null ? serverAddress.hashCode() : 0);
+ return result;
+ }
+
+ public void destroy() {
+ try {
+ socket.close();
+ if (log.isTraceEnabled()) {
+ log.trace("Successfully closed socket: " + socket);
+ }
+ } catch (IOException e) {
+ log.warn("Issues closing transport: " + this, e);
+ }
+ }
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,56 +1,101 @@
package org.infinispan.client.hotrod.impl.transport.tcp;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.TransportFactory;
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
import org.infinispan.client.hotrod.impl.transport.VHelper;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.net.InetSocketAddress;
+import java.util.Collection;
import java.util.Properties;
-import java.util.StringTokenizer;
+import java.util.Set;
/**
* // TODO: Document this
*
+ * todo - all methods but start and start can be called from multiple threads, add proper sync
+ *
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
-public class TcpTransportFactory extends AbstractTransportFactory {
+public class TcpTransportFactory implements TransportFactory {
private static Log log = LogFactory.getLog(TcpTransportFactory.class);
- private TcpConnectionPool connectionPool;
+ private GenericKeyedObjectPool connectionPool;
+ private PropsKeyedObjectPoolFactory poolFactory;
+ private RequestBalancingStrategy balancer;
+ private Collection<InetSocketAddress> servers;
@Override
- public void init(Properties props) {
- super.init(props);
- String tcpConnectionPool = props.getProperty(CONF_TCP_CONNECTION_POOL);
- if (tcpConnectionPool == null) {
- tcpConnectionPool = DefaultTcpConnectionPool.class.getName();
- log.trace("No tcp connection pools specified, using the default: " + tcpConnectionPool);
- }
- connectionPool = (TcpConnectionPool) VHelper.newInstance(tcpConnectionPool);
- connectionPool.init(props);
- connectionPool.start(serverAddresses);
+ public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+ servers = staticConfiguredServers;
+ String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
+ balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+ poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
+ connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
+ balancer.setServers(servers);
}
@Override
public void destroy() {
- if (connectionPool != null) {
- connectionPool.destroy();
+ connectionPool.clear();
+ try {
+ connectionPool.close();
+ } catch (Exception e) {
+ log.warn("Exception while shutting down the connection pool.", e);
}
}
@Override
public Transport getTransport() {
- return new TcpTransport(connectionPool.getConnection());
+ InetSocketAddress server = balancer.nextServer();
+ try {
+ return (Transport) connectionPool.borrowObject(server);
+ } catch (Exception e) {
+ String message = "Could not fetch transport";
+ log.error(message, e);
+ throw new TransportException(message, e);
+ }
}
@Override
public void releaseTransport(Transport transport) {
TcpTransport tcpTransport = (TcpTransport) transport;
- connectionPool.releaseConnection(tcpTransport.getSocket());
+ try {
+ connectionPool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
+ } catch (Exception e) {
+ log.warn("Could not release connection: " + tcpTransport,e);
+ }
}
+
+ @Override
+ public void updateServers(Collection<InetSocketAddress> newServers) {
+ if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
+ log.info("Same list of servers, not changing the pool");
+ return;
+ }
+ for (InetSocketAddress server : newServers) {
+ if (!servers.contains(server)) {
+ log.info("New server added(" + server + "), adding to the pool.");
+ try {
+ connectionPool.addObject(server);
+ } catch (Exception e) {
+ log.warn("Failed adding server " + server, e);
+ }
+ }
+ }
+ for (InetSocketAddress server : servers) {
+ if (!newServers.contains(server)) {
+ log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
+ connectionPool.clear(server);
+ }
+ }
+ servers.clear();
+ servers.addAll(newServers);
+ }
+
}
Modified: trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
===================================================================
--- trunk/client/hotrod-client/src/main/resources/hotrod-client.properties 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/resources/hotrod-client.properties 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,3 +1,70 @@
hotrod-servers=127.0.0.1:11311
transport-factory=org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory
-force-return-value=false
\ No newline at end of file
+force-return-value=false
+
+
+##bellow is connection pooling config
+
+#controls the maximum number of connections per server that are allocated (checked out to client threads, or idle in the pool) at one time.
+#When non-positive, there is no limit to the number of connections per server. When maxActive is reached, the
+#connection pool for that server is said to be exhausted.
+#The default setting for this parameter is 1.
+maxActive=1
+
+##sets a global limit on the number persistent connections that can be in circulation within the combined set of servers.
+#When non-positive, there is no limit to the total number of persistent connections in circulation.
+#When maxTotal is exceeded, all connections pools are exhausted. The default setting for this parameter is -1 (no limit).
+maxTotal = -1
+
+#controls the maximum number of persistent connections, per server, at any time. When negative, there is no limit to the
+#number of connections that may be idle per server. The default setting for this parameter is 4.
+maxIdle = 4
+
+#Specifies what happens when asking for a connection from a server pool, and that pool is exhausted. Possible values:
+# - 0 - an exception will be thrown to the calling user
+# - 1 - the caller will block (invoke waits until a new or idle connections is available.
+# - 2 - a new persistent connection will be created and and returned (essentially making maxActive meaningless.)
+# If a positive maxWait value is supplied, will block for at most that many milliseconds,
+# after which a an exception will be thrown. If maxWait is non-positive, call will block indefinitely.
+#The default whenExhaustedAction setting is 1.
+whenExhaustedAction = 1
+
+#When testOnBorrow is set, the pool will attempt to validate each connection before it is returned, by sending an TCP packet to the server.
+#Connections that fail to validate will be dropped from the pool, and a different connection will be taken. The default setting for this parameter is false.
+testOnBorrow = false
+
+#when testOnReturn is set, the pool will attempt to validate each connection before it is returned to the pool by sending an TCP packet to the server.
+#Connections that fail to validate will be dropped from the pool. The default setting for this parameter is false.
+testOnReturn = false
+
+##Optionally, one may configure the pool to examine and possibly evict connections as they sit idle in the pool and to
+#ensure that a minimum number of idle connections is maintained for each server. This is performed by an "idle object eviction" thread,
+#which runs asynchronously. Caution should be used when configuring this optional feature. Eviction runs require an
+#exclusive synchronization lock on the pool, so if they run too frequently and / or incur excessive latency when creating,
+#destroying or validating connection instances, performance issues may result. The idle connection eviction thread may be
+#configured using the following attributes:
+
+#timeBetweenEvictionRunsMillis indicates how long the eviction thread should sleep before "runs" of examining idle connections. When non-positive,
+#no eviction thread will be launched. The default setting for this parameter is -1 (i.e., by default, idle connections eviction is disabled).
+timeBetweenEvictionRunsMillis=-1
+
+#minEvictableIdleTimeMillis specifies the minimum amount of time that an connection may sit idle in the pool before it is
+#eligible for eviction due to idle time. When non-positive, no connection will be dropped from the pool due to idle time alone.
+#This setting has no effect unless timeBetweenEvictionRunsMillis > 0. The default setting for this parameter is 30 minutes.
+minEvictableIdleTimeMillis=1800000
+
+#testWhileIdle indicates whether or not idle connections should be validated by sending an TCP packet to the server, during idle connection eviction runs.
+# Connections that fail to validate will be dropped from the pool. This setting has no effect unless timeBetweenEvictionRunsMillis > 0.
+# The default setting for this parameter is false.
+testWhileIdle = false
+
+#minIdle sets a target value for the minimum number of idle connections (per server) that should always be available.
+#If this parameter is set to a positive number and timeBetweenEvictionRunsMillis > 0, each time the idle connection eviction thread runs,
+#it will try to create enough idle instances so that there will be minIdle idle instances available for each server.
+# The default setting for this parameter is 0.
+minIdle = 0
+
+#lifo determines whether or not the pools return idle connection in last-in-first-out order. False means that the pools
+#behave as FIFO queues - connections are taken from idle connections pools in the order that they are returned.
+#The default setting for this parameter is true.
+lifo = true
Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ClientConnectionPoolingTest {
+
+ /**
+ * What happens if a server goes down and after that, we try to create a connection to that server.
+ */
+ public void testServerGoesDown() {
+
+ }
+}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -53,9 +53,8 @@
@AfterClass(enabled = true)
public void testDestroyRemoteCacheFactory() {
- assert remoteCache.ping();
+ remoteCacheManager.stop();
hotrodServer.stop();
- assert !remoteCache.ping();
}
public void testPut() {
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -72,41 +72,10 @@
@AfterClass
public void testDestroyRemoteCacheFactory() {
- assert remoteCache.ping();
+ remoteCacheManager.stop();
+// assert remoteCache.ping();
hotrodServer.stop();
- assert !remoteCache.ping();
-// try {
-// remoteCache.get("aKey");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.clear();
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.put("aKey", "aValue");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.putIfAbsent("aKey", "aValue");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.remove("aKey", 0);
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.remove("aKey");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.replace("aKey", "aNewValue");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
-// try {
-// remoteCache.replace("aKey", "aNewValue");
-// assert false;
-// } catch (ClientDisconnectedException e) {}
+// assert !remoteCache.ping();
}
public void testPut() {
@@ -125,7 +94,7 @@
assertCacheContains(cache, "aKey", "aValue");
assert remoteCache.get("aKey").equals("aValue");
-
+
assert null == remoteCache.remove("aKey");
assertCacheContains(cache, "aKey", null);
assert !remoteCache.containsKey("aKey");
@@ -172,8 +141,8 @@
public void testReplaceIfUnmodified() {
assert null == remoteCache.replace("aKey", "aValue");
-
+
remoteCache.put("aKey", "aValue");
VersionedValue valueBinary = remoteCache.getVersioned("aKey");
assert remoteCache.replace("aKey", "aNewValue", valueBinary.getVersion());
@@ -222,10 +191,6 @@
assert cache.isEmpty();
}
- public void testStats() {
- //todo implement
- }
-
private void assertCacheContains(Cache cache, String key, String value) {
SerializationMarshaller marshaller = new SerializationMarshaller();
byte[] keyBytes = marshaller.marshallObject(key);
@@ -238,9 +203,4 @@
assert Arrays.equals(valueBytes, cacheValue.data());
}
}
-
- private Object get(Cache cache, String s) {
-
- return new String((byte[])cache.get(s));
- }
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -41,8 +41,8 @@
@AfterMethod
void tearDown() {
TestingUtil.killCacheManagers(cacheManager);
+ rcm.stop();
hotrodServer.stop();
- rcm.stop();
}
public void testAllStatsArePresent() {
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,6 +1,5 @@
package org.infinispan.client.hotrod;
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
import org.infinispan.manager.CacheManager;
import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.SingleCacheManagerTest;
@@ -28,18 +27,18 @@
protected CacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createLocalCacheManager();
hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
- prevValue = System.setProperty(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS, "localhost:" + hotrodServer.getPort());
+ prevValue = System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, "localhost:" + hotrodServer.getPort());
return cacheManager;
}
@AfterTest(alwaysRun = true)
public void release() {
- if (hotrodServer != null) hotrodServer.stop();
if (cacheManager != null) cacheManager.stop();
+ if (hotrodServer != null) hotrodServer.stop();
if (prevValue != null) {
- System.setProperty(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS, prevValue);
+ System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, prevValue);
} else {
- System.getProperties().remove(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS);
+ System.getProperties().remove(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS);
}
}
Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java 2010-04-15 23:22:48 UTC (rev 1695)
@@ -0,0 +1,49 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Properties;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "client.hotrod.ServerShutdownTest", groups = "functional")
+public class ServerShutdownTest {
+
+ @Test(enabled = false)
+ public void testServerShutdownWithConnectedClient() {
+ CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+ HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
+ RemoteCache remoteCache = remoteCacheManager.getCache();
+
+ remoteCache.put("k","v");
+ assertEquals("v", remoteCache.get("k"));
+
+ hotrodServer.stop();
+ cacheManager.stop();
+ remoteCacheManager.stop();
+ }
+
+ public void testServerShutdownWithoutConnectedClient() {
+ CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+ HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
+ RemoteCache remoteCache = remoteCacheManager.getCache();
+
+ remoteCache.put("k","v");
+ assertEquals("v", remoteCache.get("k"));
+
+ remoteCacheManager.stop();
+ hotrodServer.stop();
+ cacheManager.stop();
+ }
+}
Modified: trunk/client/hotrod-client/src/test/resources/log4j.xml
===================================================================
--- trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-15 23:22:48 UTC (rev 1695)
@@ -56,10 +56,6 @@
<priority value="TRACE"/>
</category>
- <category name="org.infinispan">
- <priority value="INFO"/>
- </category>
-
<category name="com.mchange">
<priority value="TRACE"/>
</category>
More information about the infinispan-commits
mailing list