[infinispan-commits] Infinispan SVN: r1694 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 15 18:32:54 EDT 2010


Author: mircea.markus
Date: 2010-04-15 18:32:54 -0400 (Thu, 15 Apr 2010)
New Revision: 1694

Added:
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
Modified:
   trunk/client/hotrod-client/pom.xml
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
   trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
added client connection pooling

Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/pom.xml	2010-04-15 22:32:54 UTC (rev 1694)
@@ -55,5 +55,11 @@
          <version>${version.netty}</version>
       </dependency>
 
+      <dependency>
+         <groupId>commons-pool</groupId>
+         <artifactId>commons-pool</artifactId>
+         <version>1.5.4</version>
+      </dependency>
+
    </dependencies>
 </project>

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -7,11 +7,11 @@
 import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
 import org.infinispan.client.hotrod.impl.SerializationMarshaller;
 import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
 import org.infinispan.lifecycle.Lifecycle;
 import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.util.Util;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -138,7 +138,7 @@
          factory = TcpTransportFactory.class.getName();
          log.info("'transport-factory' factory not specified, using " + factory);
       }
-      transportFactory = (TransportFactory) newInstance(factory);
+      transportFactory = (TransportFactory) VHelper.newInstance(factory);
       transportFactory.init(props);
       hotrodMarshaller = props.getProperty("marshaller");
       if (hotrodMarshaller == null) {
@@ -170,16 +170,8 @@
       }
    }
 
-   private Object newInstance(String clazz) {
-      try {
-         return Util.getInstance(clazz);
-      } catch (Exception e) {
-         throw new HotRodClientException("Could not instantiate class: " + clazz, e);
-      }
-   }
-
    private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName) {
-      HotrodMarshaller marshaller = (HotrodMarshaller) newInstance(hotrodMarshaller);
+      HotrodMarshaller marshaller = (HotrodMarshaller) VHelper.newInstance(hotrodMarshaller);
       HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
       return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
    }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -33,7 +33,7 @@
    }
 
    public byte[] get(byte[] key, Flag[] flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -49,7 +49,7 @@
    }
 
    public byte[] remove(byte[] key, Flag[] flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -64,7 +64,7 @@
    }
 
    public boolean containsKey(byte[] key, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -79,7 +79,7 @@
    }
 
    public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
          if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -101,7 +101,7 @@
 
 
    public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
          if (status != NO_ERROR_STATUS) {
@@ -114,7 +114,7 @@
    }
 
    public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
          if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -127,7 +127,7 @@
    }
 
    public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
          if (status == NO_ERROR_STATUS) {
@@ -148,7 +148,7 @@
     * was sent, the response would be empty.
     */
    public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          // 1) write header
          long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
@@ -169,7 +169,7 @@
     * Request: [header][key length][key][entry_version]
     */
    public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          // 1) write header
          long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
@@ -187,7 +187,7 @@
    }
 
    public void clear(Flag... flags) {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          // 1) write header
          long messageId = writeHeader(transport, CLEAR_REQUEST, flags);
@@ -198,7 +198,7 @@
    }
 
    public Map<String, String> stats() {
-      Transport transport = getTransport();
+      Transport transport = transportFactory.getTransport();
       try {
          // 1) write header
          long messageId = writeHeader(transport, STATS_REQUEST);
@@ -217,15 +217,11 @@
       }
    }
 
-   public Transport getTransport() {
-      return transportFactory.getTransport();
-   }
-
    @Override
    public boolean ping() {
       Transport transport = null;
       try {
-         transport = getTransport();
+         transport = transportFactory.getTransport();
          // 1) write header
          long messageId = writeHeader(transport, PING_REQUEST);
          short respStatus = readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE);
@@ -264,7 +260,6 @@
    /*
     * Magic	| MessageId	| Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
     */
-
    private long writeHeader(Transport transport, short operationCode, Flag... flags) {
       transport.writeByte(REQUEST_MAGIC);
       long messageId = MSG_ID.incrementAndGet();
@@ -319,7 +314,7 @@
          log.trace("Received operation code is: " + receivedOpCode);
       }
       short status = transport.readByte();
-      transport.readByte(); //todo - this will be changed once we support smarter than basic clients
+      transport.readByte(); //todo - this is the topology change status, and it will be changed once we support smarter than basic clients
       checkForErrorsInResponseStatus(status, messageId, transport);
       return status;
    }
@@ -382,7 +377,7 @@
 
    private void releaseTransport(Transport transport) {
       if (transport != null)
-         transport.release();
+        transportFactory.releaseTransport(transport);
    }
 
    private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -12,7 +12,7 @@
 
    public void writeByte(short toWrite);
 
-   public void writeVInt(int length);
+   public void writeVInt(int vint);
 
    public void writeVLong(long l);
 

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -12,10 +12,14 @@
 
    public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
 
+   public static final String CONF_TCP_CONNECTION_POOL = "tcp-connection-pool";
+
    public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
 
    public Transport getTransport();
 
+   public void releaseTransport(Transport transport);
+
    void init(Properties props);
 
    void destroy();

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -4,7 +4,11 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
 
 /**
@@ -17,8 +21,7 @@
 
    private static Log log = LogFactory.getLog(AbstractTransportFactory.class);
 
-   protected String serverHost;
-   protected int serverPort;
+   protected Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
 
    public void init(Properties props) {
       String servers = props.getProperty(CONF_HOTROD_SERVERS);
@@ -32,10 +35,16 @@
          log.info("'hotrod-servers' property not specified in config, using " + servers);
       }
       StringTokenizer tokenizer = new StringTokenizer(servers, ";");
-      String server = tokenizer.nextToken();
-      String[] serverDef = tokenizeServer(server);
-      serverHost = serverDef[0];
-      serverPort = Integer.parseInt(serverDef[1]);
+      while (tokenizer.hasMoreTokens()) {
+         String server = tokenizer.nextToken();
+         String[] serverDef = tokenizeServer(server);
+         String serverHost = serverDef[0];
+         int serverPort = Integer.parseInt(serverDef[1]);
+         serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
+      }
+      if (serverAddresses.isEmpty()) {
+         throw new IllegalStateException("No hot-rod servers specified!");
+      }
    }
 
    private String[] tokenizeServer(String server) {

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -1,6 +1,8 @@
 package org.infinispan.client.hotrod.impl.transport;
 
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
 import org.infinispan.io.UnsignedNumeric;
+import org.infinispan.util.Util;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -45,4 +47,12 @@
          throw new TransportException(e);
       }
    }
+
+   public static Object newInstance(String clazz) {
+      try {
+         return Util.getInstance(clazz);
+      } catch (Exception e) {
+         throw new HotRodClientException("Could not instantiate class: " + clazz, e);
+      }
+   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -23,7 +23,7 @@
    @Override
    public void init(Properties props) {
       super.init(props);
-      serverAddr = new InetSocketAddress(serverHost, serverPort);
+      serverAddr = super.serverAddresses.iterator().next();
    }
 
    @Override
@@ -36,4 +36,9 @@
    public void destroy() {
       // TODO: Customise this generated block
    }
+
+   @Override
+   public void releaseTransport(Transport transport) {
+      transport.release();
+   }
 }

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,107 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPoolFactory;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ *
+ * todo - all methods but init and start can be called from multiple threads, add proper sync
+ */
+public class DefaultTcpConnectionPool implements TcpConnectionPool {
+
+   private static Log log = LogFactory.getLog(DefaultTcpConnectionPool.class);
+
+   private GenericKeyedObjectPool connectionPool;
+   private PropsKeyedObjectPoolFactory poolFactory;
+   private RequestBalancingStrategy balancer;
+   private Collection<InetSocketAddress> servers;
+
+   @Override
+   public void init(Properties props) {
+      String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
+      balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+      poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
+   }
+
+   @Override
+   public void start(Collection<InetSocketAddress> servers) {
+      connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
+      balancer.setServers(servers);
+      this.servers = servers;
+   }
+
+   @Override
+   public Socket getConnection() {
+      InetSocketAddress server = balancer.nextServer();
+      try {
+         return (Socket) connectionPool.borrowObject(server);
+      } catch (Exception e) {
+         String message = "Could not fetch connection";
+         log.error(message, e);
+         throw new TransportException(message, e);
+      }
+   }
+
+   @Override
+   public void releaseConnection(Socket socket) {
+      SocketAddress remoteAddress = socket.getRemoteSocketAddress();
+      if (!servers.contains(remoteAddress)) throw new IllegalStateException(remoteAddress.toString());
+      try {
+         connectionPool.returnObject(remoteAddress, socket);
+      } catch (Exception e) {
+         log.warn("Could not release connection",e);
+      }
+   }
+
+   @Override
+   public void updateServers(Collection<InetSocketAddress> newServers) {
+      if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
+         log.info("Same list of servers, not changing the pool");
+         return;
+      }
+      for (InetSocketAddress server : newServers) {
+         if (!servers.contains(server)) {
+            log.info("New server added(" + server + "), adding to the pool.");
+            try {
+               connectionPool.addObject(server);
+            } catch (Exception e) {
+               log.warn("Failed adding server " + server, e);
+            }
+         }
+      }
+      for (InetSocketAddress server : servers) {
+         if (!newServers.contains(server)) {
+            log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
+            connectionPool.clear(server);
+         }
+      }
+      servers.clear();
+      servers.addAll(newServers);
+   }
+
+   @Override
+   public void destroy() {
+      connectionPool.clear();
+      try {
+         connectionPool.close();
+      } catch (Exception e) {
+         log.warn("Exception while shutting down the connection pool.", e);
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,70 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class PropsKeyedObjectPoolFactory extends GenericKeyedObjectPoolFactory {
+
+
+   private static Log log = LogFactory.getLog(PropsKeyedObjectPoolFactory.class);
+
+   public PropsKeyedObjectPoolFactory(KeyedPoolableObjectFactory factory, Properties props) {
+      super(factory);
+      _maxActive = intProp(props, "maxActive", 1);
+      _maxTotal = intProp(props, "maxTotal", -1);
+      _maxIdle = intProp(props, "maxIdle", 4);
+      int value = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK;
+      _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", value);
+      _testOnBorrow = booleanProp(props, "testOnBorrow", false);
+      _testOnReturn = booleanProp(props, "testOnReturn", false);
+      _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", -1);
+      _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 1800000L);
+      _testWhileIdle = booleanProp(props, "testWhileIdle", false);
+      _minIdle = intProp(props, "minIdle", 0);
+      _lifo = booleanProp(props, "lifo", true);
+   }
+
+   private int intProp(Properties p, String name, int defaultValue) {
+      return (Integer) getValue(p, name, defaultValue);
+   }
+
+   private boolean booleanProp(Properties p, String name, Boolean defaultValue) {
+      return (Boolean) getValue(p, name, defaultValue);
+   }
+
+   private long longProp(Properties p, String name, long defaultValue) {
+      return (Long) getValue(p, name, defaultValue);
+   }
+
+   public Object getValue(Properties p, String name, Object defaultValue) {
+      Object propValue = p.get(name);
+      if (propValue == null) {
+         if (log.isTraceEnabled()) {
+            log.trace(name + " property not specified, using default value(" + defaultValue + ")");
+         }
+         return defaultValue;
+      } else {
+         log.trace(name + " = " + propValue);
+         if (defaultValue instanceof Integer) {
+            return Integer.parseInt(propValue.toString());
+         } else if (defaultValue instanceof Boolean) {
+            return Boolean.parseBoolean(propValue.toString());
+         } else if (defaultValue instanceof Long) {
+            return Long.parseLong(propValue.toString());
+         } else {
+            throw new IllegalStateException();
+         }
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface RequestBalancingStrategy {
+
+   void setServers(Collection<InetSocketAddress> servers);
+
+   InetSocketAddress nextServer();
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,30 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * // TODO: Document this
+ *
+ * todo - this can be called from several threads, synchronize!
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class RoundRobinBalancingStrategy implements RequestBalancingStrategy {
+
+   private InetSocketAddress[] servers;
+   private AtomicInteger index = new AtomicInteger();
+
+   @Override
+   public void setServers(Collection<InetSocketAddress> servers) {
+      this.servers = servers.toArray(new InetSocketAddress[servers.size()]);
+   }
+
+   @Override
+   public InetSocketAddress nextServer() {
+      int pos = index.incrementAndGet() % servers.length;
+      return servers[pos];
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,61 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TcpConnectionFactory extends BaseKeyedPoolableObjectFactory {
+
+   private static Log log = LogFactory.getLog(TcpConnectionFactory.class);
+
+   @Override
+   public Object makeObject(Object key) throws Exception {
+      InetSocketAddress serverAddress = (InetSocketAddress) key;
+      if (log.isTraceEnabled()) {
+         log.trace("Creating connection to server: " + serverAddress);
+      }
+      try {
+         SocketChannel socketChannel = SocketChannel.open(serverAddress);
+         return socketChannel.socket();
+      } catch (IOException e) {
+         log.warn("Could not create connection to " + serverAddress, e);
+         throw e;
+      }
+   }
+
+   @Override
+   public boolean validateObject(Object key, Object obj) {
+      Socket socket = (Socket) obj;
+      if (log.isTraceEnabled()) {
+         log.trace("About to validate(ping) connection to server " + key + ". socket is " + socket);
+      }
+      //todo implement
+      return true;
+   }
+
+   @Override
+   public void destroyObject(Object key, Object obj) throws Exception {
+      Socket socket = (Socket) obj;
+      if (log.isTraceEnabled()) {
+         log.trace("About to destroy socket " + socket);
+      }
+      try {
+         socket.close();
+      } catch (IOException e) {
+         log.warn("Issues closing the socket: " + socket, e);
+      }
+   }
+}

Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java	                        (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,28 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface
+      TcpConnectionPool {
+
+   public void init(Properties props);
+
+   public void start(Collection<InetSocketAddress> servers);
+
+   public Socket getConnection();
+
+   public void releaseConnection(Socket socket);
+
+   public void updateServers(Collection<InetSocketAddress> newServers);
+
+   public void destroy();
+}

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -22,13 +22,14 @@
 
    private static Log log = LogFactory.getLog(TcpTransport.class);
 
-   private String host;
-   private int port;
    private Socket socket;
 
-   public void writeVInt(int length) {
+   public void writeVInt(int vInt) {
       try {
-         VHelper.writeVInt(length, socket.getOutputStream());
+         VHelper.writeVInt(vInt, socket.getOutputStream());
+         if (log.isTraceEnabled())
+            log.trace("VInt wrote " + vInt);
+         
       } catch (IOException e) {
          throw new TransportException(e);
       }
@@ -37,6 +38,9 @@
    public void writeVLong(long l) {
       try {
          VHelper.writeVLong(l, socket.getOutputStream());
+         if (log.isTraceEnabled())
+            log.trace("VLong wrote " + l);        
+
       } catch (IOException e) {
          throw new TransportException(e);
       }
@@ -44,7 +48,10 @@
 
    public long readVLong() {
       try {
-         return VHelper.readVLong(socket.getInputStream());
+         long result = VHelper.readVLong(socket.getInputStream());
+         if (log.isTraceEnabled())
+            log.trace("VLong read " + result);
+         return result;
       } catch (IOException e) {
          throw new TransportException(e);
       }
@@ -52,29 +59,24 @@
 
    public int readVInt() {
       try {
-         return VHelper.readVInt(socket.getInputStream());
+         int result = VHelper.readVInt(socket.getInputStream());
+         if (log.isTraceEnabled())
+            log.trace("VInt read " + result);
+         return result;
       } catch (IOException e) {
          throw new TransportException(e);
       }
    }
 
-   public TcpTransport(String host, int port) {
-      this.host = host;
-      this.port = port;
+   public TcpTransport(Socket socket) {
+      this.socket = socket;
    }
 
-   public void connect() {
-      try {
-         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(host, port));
-         socket = socketChannel.socket();
-      } catch (IOException e) {
-         throw new TransportException("Problems establishing initial connection", e);
-      }
-   }
-
    protected void writeBytes(byte[] toAppend) {
       try {
          socket.getOutputStream().write(toAppend);
+         if (log.isTraceEnabled())
+            log.trace("Wrote " + toAppend.length + " bytes");
       } catch (IOException e) {
          throw new TransportException("Problems writing data to stream", e);
       }
@@ -84,6 +86,9 @@
    public void writeByte(short toWrite) {
       try {
          socket.getOutputStream().write(toWrite);
+         if (log.isTraceEnabled())
+            log.trace("Wrote byte " + toWrite);
+
       } catch (IOException e) {
          throw new TransportException("Problems writing data to stream", e);
       }
@@ -92,6 +97,9 @@
    public void flush() {
       try {
          socket.getOutputStream().flush();
+         if (log.isTraceEnabled())
+            log.trace("Flushed socket: " + socket);
+
       } catch (IOException e) {
          throw new TransportException(e);
       }
@@ -143,6 +151,13 @@
             if (offset > result.length) throw new IllegalStateException("Assertion!");
          }
       } while (!done);
+      if (log.isTraceEnabled()) {
+         log.trace("Successfully read array with size: " + size);
+      }
       return result;
    }
+
+   public Socket getSocket() {
+      return socket;
+   }
 }

Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-04-15 22:32:54 UTC (rev 1694)
@@ -3,9 +3,11 @@
 import org.infinispan.client.hotrod.impl.Transport;
 import org.infinispan.client.hotrod.impl.TransportFactory;
 import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
+import java.net.InetSocketAddress;
 import java.util.Properties;
 import java.util.StringTokenizer;
 
@@ -19,16 +21,36 @@
 
    private static Log log = LogFactory.getLog(TcpTransportFactory.class);
 
+   private TcpConnectionPool connectionPool;
+
    @Override
+   public void init(Properties props) {
+      super.init(props);
+      String tcpConnectionPool = props.getProperty(CONF_TCP_CONNECTION_POOL);
+      if (tcpConnectionPool == null) {
+         tcpConnectionPool = DefaultTcpConnectionPool.class.getName();
+         log.trace("No tcp connection pools specified, using the default: " + tcpConnectionPool);
+      }
+      connectionPool = (TcpConnectionPool) VHelper.newInstance(tcpConnectionPool);
+      connectionPool.init(props);
+      connectionPool.start(serverAddresses);
+   }
+
+   @Override
    public void destroy() {
-      // TODO: Customise this generated block
+      if (connectionPool != null) {
+         connectionPool.destroy();
+      }
    }
 
    @Override
    public Transport getTransport() {
-      log.info("Connecting to server on: " + serverHost + ":" + serverPort);
-      TcpTransport transport = new TcpTransport(serverHost, serverPort);
-      transport.connect();
-      return transport;
+      return new TcpTransport(connectionPool.getConnection());
    }
+
+   @Override
+   public void releaseTransport(Transport transport) {
+      TcpTransport tcpTransport = (TcpTransport) transport;
+      connectionPool.releaseConnection(tcpTransport.getSocket());
+   }
 }



More information about the infinispan-commits mailing list