[infinispan-commits] Infinispan SVN: r1695 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 15 19:22:49 EDT 2010


Author: mircea.markus
Date: 2010-04-15 19:22:48 -0400 (Thu, 15 Apr 2010)
New Revision: 1695

Added:
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
Removed:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
Modified:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
   trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
   trunk/client/hotrod-client/src/test/resources/log4j.xml
Log:
better object pooling

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -17,8 +17,12 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.net.URL;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 /**
  * // TODO: Document this
@@ -30,10 +34,16 @@
 
    private static Log log = LogFactory.getLog(RemoteCacheManager.class);
 
+   public static final String HOTROD_CLIENT_PROPERTIES = "hotrod-client.properties";
+
+   public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
+
+   public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
+
+
    private Properties props;
    private TransportFactory transportFactory;
    private String hotrodMarshaller;
-   private static final String HOTROD_CLIENT_PROPERTIES = "hotrod-client.properties";
    private boolean started = false;
 
 
@@ -139,7 +149,7 @@
          log.info("'transport-factory' factory not specified, using " + factory);
       }
       transportFactory = (TransportFactory) VHelper.newInstance(factory);
-      transportFactory.init(props);
+      transportFactory.start(props, getStaticConfiguredServers(props));
       hotrodMarshaller = props.getProperty("marshaller");
       if (hotrodMarshaller == null) {
          hotrodMarshaller = SerializationMarshaller.class.getName();
@@ -175,4 +185,36 @@
       HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
       return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
    }
+
+   private Set<InetSocketAddress> getStaticConfiguredServers(Properties props) {
+      Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
+      String servers = props.getProperty(CONF_HOTROD_SERVERS);
+      if (servers == null) {
+         servers = System.getProperty(OVERRIDE_HOTROD_SERVERS);
+         if (servers != null) {
+            log.info("Overwriting default server properties (-D" + OVERRIDE_HOTROD_SERVERS + ") with " + servers);
+         } else {
+            servers = "127.0.0.1:11311";
+         }
+         log.info("'hotrod-servers' property not specified in config, using " + servers);
+      }
+      StringTokenizer tokenizer = new StringTokenizer(servers, ";");
+      while (tokenizer.hasMoreTokens()) {
+         String server = tokenizer.nextToken();
+         String[] serverDef = tokenizeServer(server);
+         String serverHost = serverDef[0];
+         int serverPort = Integer.parseInt(serverDef[1]);
+         serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
+      }
+      if (serverAddresses.isEmpty()) {
+         throw new IllegalStateException("No hot-rod servers specified!");
+      }
+      return serverAddresses;
+   }
+
+   private String[] tokenizeServer(String server) {
+      StringTokenizer t = new StringTokenizer(server, ":");
+      return new String[]{t.nextToken(), t.nextToken()};
+   }  
+
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,6 +1,9 @@
 package org.infinispan.client.hotrod.impl;
 
+import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * // TODO: Document this
@@ -12,15 +15,13 @@
 
    public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
 
-   public static final String CONF_TCP_CONNECTION_POOL = "tcp-connection-pool";
-
-   public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
-
    public Transport getTransport();
 
    public void releaseTransport(Transport transport);
 
-   void init(Properties props);
+   void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers);
 
+   void updateServers(Collection<InetSocketAddress> newServers);
+
    void destroy();
 }

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,54 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport;
-
-import org.infinispan.client.hotrod.impl.TransportFactory;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public abstract class AbstractTransportFactory implements TransportFactory {
-
-   private static Log log = LogFactory.getLog(AbstractTransportFactory.class);
-
-   protected Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
-
-   public void init(Properties props) {
-      String servers = props.getProperty(CONF_HOTROD_SERVERS);
-      if (servers == null) {
-         servers = System.getProperty(OVERRIDE_HOTROD_SERVERS);
-         if (servers != null) {
-            log.info("Overwriting default server properties (-D" + OVERRIDE_HOTROD_SERVERS + ") with " + servers);
-         } else {
-            servers = "127.0.0.1:11311";
-         }
-         log.info("'hotrod-servers' property not specified in config, using " + servers);
-      }
-      StringTokenizer tokenizer = new StringTokenizer(servers, ";");
-      while (tokenizer.hasMoreTokens()) {
-         String server = tokenizer.nextToken();
-         String[] serverDef = tokenizeServer(server);
-         String serverHost = serverDef[0];
-         int serverPort = Integer.parseInt(serverDef[1]);
-         serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
-      }
-      if (serverAddresses.isEmpty()) {
-         throw new IllegalStateException("No hot-rod servers specified!");
-      }
-   }
-
-   private String[] tokenizeServer(String server) {
-      StringTokenizer t = new StringTokenizer(server, ":");
-      return new String[]{t.nextToken(), t.nextToken()};
-   }
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,11 +1,12 @@
 package org.infinispan.client.hotrod.impl.transport.netty;
 
 import org.infinispan.client.hotrod.impl.Transport;
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.TransportFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Properties;
 
 /**
@@ -14,19 +15,25 @@
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
-public class NettyTransportFactory extends AbstractTransportFactory {
+public class NettyTransportFactory implements TransportFactory {
 
    private static Log log = LogFactory.getLog(NettyTransportFactory.class);
 
    private InetSocketAddress serverAddr;
+   private Collection<InetSocketAddress> serverAddresses;
 
    @Override
-   public void init(Properties props) {
-      super.init(props);
-      serverAddr = super.serverAddresses.iterator().next();
+   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+      this.serverAddresses = staticConfiguredServers;
+      serverAddr = serverAddresses.iterator().next();
    }
 
    @Override
+   public void updateServers(Collection<InetSocketAddress> newServers) {
+      throw new IllegalStateException();
+   }
+
+   @Override
    public Transport getTransport() {
       log.info("Connecting to server on: " + serverAddr);
       return new NettyTransport(serverAddr);
@@ -34,7 +41,7 @@
 
    @Override
    public void destroy() {
-      // TODO: Customise this generated block
+      //nothing to do here as this no pooling is available
    }
 
    @Override

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,107 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport.tcp;
-
-import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.infinispan.client.hotrod.impl.transport.TransportException;
-import org.infinispan.client.hotrod.impl.transport.VHelper;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.util.Collection;
-import java.util.Properties;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- *
- * todo - all methods but init and start can be called from multiple threads, add proper sync
- */
-public class DefaultTcpConnectionPool implements TcpConnectionPool {
-
-   private static Log log = LogFactory.getLog(DefaultTcpConnectionPool.class);
-
-   private GenericKeyedObjectPool connectionPool;
-   private PropsKeyedObjectPoolFactory poolFactory;
-   private RequestBalancingStrategy balancer;
-   private Collection<InetSocketAddress> servers;
-
-   @Override
-   public void init(Properties props) {
-      String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
-      balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
-      poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
-   }
-
-   @Override
-   public void start(Collection<InetSocketAddress> servers) {
-      connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
-      balancer.setServers(servers);
-      this.servers = servers;
-   }
-
-   @Override
-   public Socket getConnection() {
-      InetSocketAddress server = balancer.nextServer();
-      try {
-         return (Socket) connectionPool.borrowObject(server);
-      } catch (Exception e) {
-         String message = "Could not fetch connection";
-         log.error(message, e);
-         throw new TransportException(message, e);
-      }
-   }
-
-   @Override
-   public void releaseConnection(Socket socket) {
-      SocketAddress remoteAddress = socket.getRemoteSocketAddress();
-      if (!servers.contains(remoteAddress)) throw new IllegalStateException(remoteAddress.toString());
-      try {
-         connectionPool.returnObject(remoteAddress, socket);
-      } catch (Exception e) {
-         log.warn("Could not release connection",e);
-      }
-   }
-
-   @Override
-   public void updateServers(Collection<InetSocketAddress> newServers) {
-      if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
-         log.info("Same list of servers, not changing the pool");
-         return;
-      }
-      for (InetSocketAddress server : newServers) {
-         if (!servers.contains(server)) {
-            log.info("New server added(" + server + "), adding to the pool.");
-            try {
-               connectionPool.addObject(server);
-            } catch (Exception e) {
-               log.warn("Failed adding server " + server, e);
-            }
-         }
-      }
-      for (InetSocketAddress server : servers) {
-         if (!newServers.contains(server)) {
-            log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
-            connectionPool.clear(server);
-         }
-      }
-      servers.clear();
-      servers.addAll(newServers);
-   }
-
-   @Override
-   public void destroy() {
-      connectionPool.clear();
-      try {
-         connectionPool.close();
-      } catch (Exception e) {
-         log.warn("Exception while shutting down the connection pool.", e);
-      }
-   }
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,15 +1,10 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
 import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.apache.commons.pool.BasePoolableObjectFactory;
-import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
 
 /**
  * // TODO: Document this
@@ -24,23 +19,18 @@
    @Override
    public Object makeObject(Object key) throws Exception {
       InetSocketAddress serverAddress = (InetSocketAddress) key;
+      TcpTransport tcpTransport = new TcpTransport(serverAddress);
       if (log.isTraceEnabled()) {
-         log.trace("Creating connection to server: " + serverAddress);
+         log.trace("Created tcp transport: " + tcpTransport);
       }
-      try {
-         SocketChannel socketChannel = SocketChannel.open(serverAddress);
-         return socketChannel.socket();
-      } catch (IOException e) {
-         log.warn("Could not create connection to " + serverAddress, e);
-         throw e;
-      }
+      return tcpTransport;
    }
 
    @Override
    public boolean validateObject(Object key, Object obj) {
-      Socket socket = (Socket) obj;
+      TcpTransport transport = (TcpTransport) obj;
       if (log.isTraceEnabled()) {
-         log.trace("About to validate(ping) connection to server " + key + ". socket is " + socket);
+         log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
       }
       //todo implement
       return true;
@@ -48,14 +38,7 @@
 
    @Override
    public void destroyObject(Object key, Object obj) throws Exception {
-      Socket socket = (Socket) obj;
-      if (log.isTraceEnabled()) {
-         log.trace("About to destroy socket " + socket);
-      }
-      try {
-         socket.close();
-      } catch (IOException e) {
-         log.warn("Issues closing the socket: " + socket, e);
-      }
+      TcpTransport transport = (TcpTransport) obj;
+      transport.destroy();
    }
 }

Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,28 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport.tcp;
-
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collection;
-import java.util.Properties;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public interface
-      TcpConnectionPool {
-
-   public void init(Properties props);
-
-   public void start(Collection<InetSocketAddress> servers);
-
-   public Socket getConnection();
-
-   public void releaseConnection(Socket socket);
-
-   public void updateServers(Collection<InetSocketAddress> newServers);
-
-   public void destroy();
-}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -9,7 +9,6 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
 
 /**
@@ -23,6 +22,7 @@
    private static Log log = LogFactory.getLog(TcpTransport.class);
 
    private Socket socket;
+   private InetSocketAddress serverAddress;
 
    public void writeVInt(int vInt) {
       try {
@@ -68,8 +68,16 @@
       }
    }
 
-   public TcpTransport(Socket socket) {
-      this.socket = socket;
+   public TcpTransport(InetSocketAddress serverAddress) {
+      this.serverAddress = serverAddress;
+      try {
+         SocketChannel socketChannel = SocketChannel.open(serverAddress);
+         socket = socketChannel.socket();
+      } catch (IOException e) {
+         String message = "Could not connect to server: " + serverAddress;
+         log.error(message, e);
+         throw new TransportException(message, e);
+      }
    }
 
    protected void writeBytes(byte[] toAppend) {
@@ -156,8 +164,47 @@
       }
       return result;
    }
+   
+   public InetSocketAddress getServerAddress() {
+     return serverAddress;
+   }
 
-   public Socket getSocket() {
-      return socket;
+   @Override
+   public String toString() {
+      return "TcpTransport{" +
+            "socket=" + socket +
+            ", serverAddress=" + serverAddress +
+            "} " + super.toString();
    }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TcpTransport that = (TcpTransport) o;
+
+      if (serverAddress != null ? !serverAddress.equals(that.serverAddress) : that.serverAddress != null) return false;
+      if (socket != null ? !socket.equals(that.socket) : that.socket != null) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      int result = socket != null ? socket.hashCode() : 0;
+      result = 31 * result + (serverAddress != null ? serverAddress.hashCode() : 0);
+      return result;
+   }
+
+   public void destroy() {
+      try {
+         socket.close();
+         if (log.isTraceEnabled()) {
+            log.trace("Successfully closed socket: " + socket);
+         }
+      } catch (IOException e) {
+         log.warn("Issues closing transport: " + this, e);
+      }
+   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,56 +1,101 @@
 package org.infinispan.client.hotrod.impl.transport.tcp;
 
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.TransportFactory;
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
 import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
 import java.util.Properties;
-import java.util.StringTokenizer;
+import java.util.Set;
 
 /**
  * // TODO: Document this
  *
+ * todo - all methods but start and start can be called from multiple threads, add proper sync
+ *
  * @author Mircea.Markus at jboss.com
  * @since 4.1
  */
-public class TcpTransportFactory extends AbstractTransportFactory {
+public class TcpTransportFactory implements TransportFactory {
 
    private static Log log = LogFactory.getLog(TcpTransportFactory.class);
 
-   private TcpConnectionPool connectionPool;
+   private GenericKeyedObjectPool connectionPool;
+   private PropsKeyedObjectPoolFactory poolFactory;
+   private RequestBalancingStrategy balancer;
+   private Collection<InetSocketAddress> servers;
 
    @Override
-   public void init(Properties props) {
-      super.init(props);
-      String tcpConnectionPool = props.getProperty(CONF_TCP_CONNECTION_POOL);
-      if (tcpConnectionPool == null) {
-         tcpConnectionPool = DefaultTcpConnectionPool.class.getName();
-         log.trace("No tcp connection pools specified, using the default: " + tcpConnectionPool);
-      }
-      connectionPool = (TcpConnectionPool) VHelper.newInstance(tcpConnectionPool);
-      connectionPool.init(props);
-      connectionPool.start(serverAddresses);
+   public void start(Properties props, Collection<InetSocketAddress> staticConfiguredServers) {
+      servers = staticConfiguredServers;
+      String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
+      balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+      poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
+      connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
+      balancer.setServers(servers);
    }
 
    @Override
    public void destroy() {
-      if (connectionPool != null) {
-         connectionPool.destroy();
+      connectionPool.clear();
+      try {
+         connectionPool.close();
+      } catch (Exception e) {
+         log.warn("Exception while shutting down the connection pool.", e);
       }
    }
 
    @Override
    public Transport getTransport() {
-      return new TcpTransport(connectionPool.getConnection());
+      InetSocketAddress server = balancer.nextServer();
+      try {
+         return (Transport) connectionPool.borrowObject(server);
+      } catch (Exception e) {
+         String message = "Could not fetch transport";
+         log.error(message, e);
+         throw new TransportException(message, e);
+      }
    }
 
    @Override
    public void releaseTransport(Transport transport) {
       TcpTransport tcpTransport = (TcpTransport) transport;
-      connectionPool.releaseConnection(tcpTransport.getSocket());
+      try {
+         connectionPool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
+      } catch (Exception e) {
+         log.warn("Could not release connection: " + tcpTransport,e);
+      }
    }
+
+   @Override
+   public void updateServers(Collection<InetSocketAddress> newServers) {
+      if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
+         log.info("Same list of servers, not changing the pool");
+         return;
+      }
+      for (InetSocketAddress server : newServers) {
+         if (!servers.contains(server)) {
+            log.info("New server added(" + server + "), adding to the pool.");
+            try {
+               connectionPool.addObject(server);
+            } catch (Exception e) {
+               log.warn("Failed adding server " + server, e);
+            }
+         }
+      }
+      for (InetSocketAddress server : servers) {
+         if (!newServers.contains(server)) {
+            log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
+            connectionPool.clear(server);
+         }
+      }
+      servers.clear();
+      servers.addAll(newServers);
+   }
+
 }

Modified: trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
===================================================================
--- trunk/client/hotrod-client/src/main/resources/hotrod-client.properties	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/main/resources/hotrod-client.properties	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,3 +1,70 @@
 hotrod-servers=127.0.0.1:11311
 transport-factory=org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory
-force-return-value=false
\ No newline at end of file
+force-return-value=false
+
+
+##bellow is connection pooling config
+
+#controls the maximum number of connections per server that are allocated (checked out to client threads, or idle in the pool) at one time.
+#When non-positive, there is no limit to the number of connections per server. When maxActive is reached, the 
+#connection pool for that server is said to be exhausted.
+#The default setting for this parameter is 1.
+maxActive=1
+
+##sets a global limit on the number persistent connections that can be in circulation within the combined set of servers.
+#When non-positive, there is no limit to the total number of persistent connections in circulation.
+#When maxTotal is exceeded, all connections pools are exhausted. The default setting for this parameter is -1 (no limit).
+maxTotal = -1
+
+#controls the maximum number of persistent connections, per server, at any time. When negative, there is no limit to the
+#number of connections that may be idle per server. The default setting for this parameter is 4.
+maxIdle = 4
+
+#Specifies what happens when asking for a connection from a server pool, and that pool is exhausted. Possible values:
+# - 0 - an exception will be thrown to the calling user
+# - 1 - the caller will block (invoke waits until a new or idle connections is available.
+# - 2 - a new persistent connection will be created and and returned (essentially making maxActive meaningless.)
+#                        If a positive maxWait value is supplied, will block for at most that many milliseconds,
+#                         after which a an exception will be thrown. If maxWait is non-positive, call will block indefinitely.
+#The default whenExhaustedAction setting is 1.
+whenExhaustedAction = 1
+
+#When testOnBorrow is set, the pool will attempt to validate each connection before it is returned, by sending an TCP packet to the server.
+#Connections that fail to validate will be dropped from the pool, and a different connection will be taken. The default setting for this parameter is false.
+testOnBorrow = false
+
+#when testOnReturn is set, the pool will attempt to validate each connection before it is returned to the pool by sending an TCP packet to the server.
+#Connections that fail to validate will be dropped from the pool. The default setting for this parameter is false.
+testOnReturn = false
+
+##Optionally, one may configure the pool to examine and possibly evict connections as they sit idle in the pool and to
+#ensure that a minimum number of idle connections is maintained for each server. This is performed by an "idle object eviction" thread,
+#which runs asynchronously. Caution should be used when configuring this optional feature. Eviction runs require an
+#exclusive synchronization lock on the pool, so if they run too frequently and / or incur excessive latency when creating,
+#destroying or validating connection instances, performance issues may result. The idle connection eviction thread may be
+#configured using the following attributes:
+
+#timeBetweenEvictionRunsMillis indicates how long the eviction thread should sleep before "runs" of examining idle connections. When non-positive,
+#no eviction thread will be launched. The default setting for this parameter is -1 (i.e., by default, idle connections eviction is disabled).
+timeBetweenEvictionRunsMillis=-1
+
+#minEvictableIdleTimeMillis specifies the minimum amount of time that an connection may sit idle in the pool before it is
+#eligible for eviction due to idle time. When non-positive, no connection will be dropped from the pool due to idle time alone.
+#This setting has no effect unless timeBetweenEvictionRunsMillis > 0. The default setting for this parameter is 30 minutes.
+minEvictableIdleTimeMillis=1800000
+
+#testWhileIdle indicates whether or not idle connections should be validated by sending an TCP packet to the server, during idle connection eviction runs.
+# Connections that fail to validate will be dropped from the pool. This setting has no effect unless timeBetweenEvictionRunsMillis > 0.
+# The default setting for this parameter is false.
+testWhileIdle = false
+
+#minIdle sets a target value for the minimum number of idle connections (per server) that should always be available.
+#If this parameter is set to a positive number and timeBetweenEvictionRunsMillis > 0, each time the idle connection eviction thread runs,
+#it will try to create enough idle instances so that there will be minIdle idle instances available for each server.
+# The default setting for this parameter is 0.
+minIdle = 0
+
+#lifo determines whether or not the pools return idle connection in last-in-first-out order. False means that the pools
+#behave as FIFO queues - connections are taken from idle connections pools in the order that they are returned.
+#The default setting for this parameter is true.
+lifo = true

Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class ClientConnectionPoolingTest {
+
+   /**
+    * What happens if a server goes down and after that, we try to create a connection to that server.
+    */
+   public void testServerGoesDown() {
+
+   }
+}

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -53,9 +53,8 @@
 
    @AfterClass(enabled = true)
    public void testDestroyRemoteCacheFactory() {
-      assert remoteCache.ping();
+      remoteCacheManager.stop();
       hotrodServer.stop();
-      assert !remoteCache.ping();
    }
 
    public void testPut() {

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -72,41 +72,10 @@
 
    @AfterClass 
    public void testDestroyRemoteCacheFactory() {
-      assert remoteCache.ping();
+      remoteCacheManager.stop();
+//      assert remoteCache.ping();
       hotrodServer.stop();
-      assert !remoteCache.ping();
-//      try {
-//         remoteCache.get("aKey");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.clear();
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.put("aKey", "aValue");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.putIfAbsent("aKey", "aValue");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.remove("aKey", 0);
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.remove("aKey");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.replace("aKey", "aNewValue");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
-//      try {
-//         remoteCache.replace("aKey", "aNewValue");
-//         assert false;
-//      } catch (ClientDisconnectedException e) {}
+//      assert !remoteCache.ping();
    }
 
    public void testPut() {
@@ -125,7 +94,7 @@
       assertCacheContains(cache, "aKey", "aValue");
 
       assert remoteCache.get("aKey").equals("aValue");
-      
+
       assert null == remoteCache.remove("aKey");
       assertCacheContains(cache, "aKey", null);
       assert !remoteCache.containsKey("aKey");
@@ -172,8 +141,8 @@
 
    public void testReplaceIfUnmodified() {
       assert null == remoteCache.replace("aKey", "aValue");
-      
 
+
       remoteCache.put("aKey", "aValue");
       VersionedValue valueBinary = remoteCache.getVersioned("aKey");
       assert remoteCache.replace("aKey", "aNewValue", valueBinary.getVersion());
@@ -222,10 +191,6 @@
       assert cache.isEmpty();
    }
 
-   public void testStats() {
-      //todo implement
-   }
-
    private void assertCacheContains(Cache cache, String key, String value) {
       SerializationMarshaller marshaller = new SerializationMarshaller();
       byte[] keyBytes = marshaller.marshallObject(key);
@@ -238,9 +203,4 @@
          assert Arrays.equals(valueBytes, cacheValue.data());
       }
    }
-
-   private Object get(Cache cache, String s) {
-
-      return new String((byte[])cache.get(s));
-   }
 }

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -41,8 +41,8 @@
    @AfterMethod
    void tearDown() {
       TestingUtil.killCacheManagers(cacheManager);
+      rcm.stop();
       hotrodServer.stop();
-      rcm.stop();
    }
 
    public void testAllStatsArePresent() {

Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -1,6 +1,5 @@
 package org.infinispan.client.hotrod;
 
-import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
 import org.infinispan.manager.CacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.test.SingleCacheManagerTest;
@@ -28,18 +27,18 @@
    protected CacheManager createCacheManager() throws Exception {
       cacheManager = TestCacheManagerFactory.createLocalCacheManager();
       hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
-      prevValue = System.setProperty(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS, "localhost:" + hotrodServer.getPort());
+      prevValue = System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, "localhost:" + hotrodServer.getPort());
       return cacheManager;
    }
 
    @AfterTest(alwaysRun = true)
    public void release() {
-      if (hotrodServer != null) hotrodServer.stop();
       if (cacheManager != null) cacheManager.stop();
+      if (hotrodServer != null) hotrodServer.stop();
       if (prevValue != null) {
-         System.setProperty(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS, prevValue);
+         System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, prevValue);
       } else {
-         System.getProperties().remove(AbstractTransportFactory.OVERRIDE_HOTROD_SERVERS);
+         System.getProperties().remove(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS);
       }
    }
 

Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java	2010-04-15 23:22:48 UTC (rev 1695)
@@ -0,0 +1,49 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Properties;
+
+import static junit.framework.Assert.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "client.hotrod.ServerShutdownTest", groups = "functional")
+public class ServerShutdownTest {
+
+   @Test(enabled = false)
+   public void testServerShutdownWithConnectedClient() {
+      CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+      HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+      RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
+      RemoteCache remoteCache = remoteCacheManager.getCache();
+
+      remoteCache.put("k","v");
+      assertEquals("v", remoteCache.get("k"));
+
+      hotrodServer.stop();
+      cacheManager.stop();
+      remoteCacheManager.stop();
+   }
+
+   public void testServerShutdownWithoutConnectedClient() {
+      CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
+      HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+      RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
+      RemoteCache remoteCache = remoteCacheManager.getCache();
+
+      remoteCache.put("k","v");
+      assertEquals("v", remoteCache.get("k"));
+
+      remoteCacheManager.stop();
+      hotrodServer.stop();
+      cacheManager.stop();
+   }
+}

Modified: trunk/client/hotrod-client/src/test/resources/log4j.xml
===================================================================
--- trunk/client/hotrod-client/src/test/resources/log4j.xml	2010-04-15 22:32:54 UTC (rev 1694)
+++ trunk/client/hotrod-client/src/test/resources/log4j.xml	2010-04-15 23:22:48 UTC (rev 1695)
@@ -56,10 +56,6 @@
       <priority value="TRACE"/>
    </category>
 
-   <category name="org.infinispan">
-      <priority value="INFO"/>
-   </category>
-
    <category name="com.mchange">
       <priority value="TRACE"/>
    </category>



More information about the infinispan-commits mailing list