[infinispan-commits] Infinispan SVN: r1694 - in trunk/client/hotrod-client: src/main/java/org/infinispan/client/hotrod and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Apr 15 18:32:54 EDT 2010
Author: mircea.markus
Date: 2010-04-15 18:32:54 -0400 (Thu, 15 Apr 2010)
New Revision: 1694
Added:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
Modified:
trunk/client/hotrod-client/pom.xml
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
Log:
added client connection pooling
Modified: trunk/client/hotrod-client/pom.xml
===================================================================
--- trunk/client/hotrod-client/pom.xml 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/pom.xml 2010-04-15 22:32:54 UTC (rev 1694)
@@ -55,5 +55,11 @@
<version>${version.netty}</version>
</dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>1.5.4</version>
+ </dependency>
+
</dependencies>
</project>
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -7,11 +7,11 @@
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.SerializationMarshaller;
import org.infinispan.client.hotrod.impl.TransportFactory;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
import org.infinispan.lifecycle.Lifecycle;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.DefaultCacheManager;
-import org.infinispan.util.Util;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -138,7 +138,7 @@
factory = TcpTransportFactory.class.getName();
log.info("'transport-factory' factory not specified, using " + factory);
}
- transportFactory = (TransportFactory) newInstance(factory);
+ transportFactory = (TransportFactory) VHelper.newInstance(factory);
transportFactory.init(props);
hotrodMarshaller = props.getProperty("marshaller");
if (hotrodMarshaller == null) {
@@ -170,16 +170,8 @@
}
}
- private Object newInstance(String clazz) {
- try {
- return Util.getInstance(clazz);
- } catch (Exception e) {
- throw new HotRodClientException("Could not instantiate class: " + clazz, e);
- }
- }
-
private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName) {
- HotrodMarshaller marshaller = (HotrodMarshaller) newInstance(hotrodMarshaller);
+ HotrodMarshaller marshaller = (HotrodMarshaller) VHelper.newInstance(hotrodMarshaller);
HotrodOperations hotrodOperations = new HotrodOperationsImpl(cacheName, transportFactory);
return new RemoteCacheImpl<K, V>(hotrodOperations, marshaller, cacheName);
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -33,7 +33,7 @@
}
public byte[] get(byte[] key, Flag[] flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -49,7 +49,7 @@
}
public byte[] remove(byte[] key, Flag[] flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -64,7 +64,7 @@
}
public boolean containsKey(byte[] key, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -79,7 +79,7 @@
}
public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -101,7 +101,7 @@
public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
if (status != NO_ERROR_STATUS) {
@@ -114,7 +114,7 @@
}
public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -127,7 +127,7 @@
}
public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
if (status == NO_ERROR_STATUS) {
@@ -148,7 +148,7 @@
* was sent, the response would be empty.
*/
public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, flags);
@@ -169,7 +169,7 @@
* Request: [header][key length][key][entry_version]
*/
public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, flags);
@@ -187,7 +187,7 @@
}
public void clear(Flag... flags) {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, CLEAR_REQUEST, flags);
@@ -198,7 +198,7 @@
}
public Map<String, String> stats() {
- Transport transport = getTransport();
+ Transport transport = transportFactory.getTransport();
try {
// 1) write header
long messageId = writeHeader(transport, STATS_REQUEST);
@@ -217,15 +217,11 @@
}
}
- public Transport getTransport() {
- return transportFactory.getTransport();
- }
-
@Override
public boolean ping() {
Transport transport = null;
try {
- transport = getTransport();
+ transport = transportFactory.getTransport();
// 1) write header
long messageId = writeHeader(transport, PING_REQUEST);
short respStatus = readHeaderAndValidate(transport, messageId, HotrodConstants.PING_RESPONSE);
@@ -264,7 +260,6 @@
/*
* Magic | MessageId | Version | Opcode | CacheNameLength | CacheName | Flags | Client Intelligence | Topology Id
*/
-
private long writeHeader(Transport transport, short operationCode, Flag... flags) {
transport.writeByte(REQUEST_MAGIC);
long messageId = MSG_ID.incrementAndGet();
@@ -319,7 +314,7 @@
log.trace("Received operation code is: " + receivedOpCode);
}
short status = transport.readByte();
- transport.readByte(); //todo - this will be changed once we support smarter than basic clients
+ transport.readByte(); //todo - this is the topology change status, and it will be changed once we support smarter than basic clients
checkForErrorsInResponseStatus(status, messageId, transport);
return status;
}
@@ -382,7 +377,7 @@
private void releaseTransport(Transport transport) {
if (transport != null)
- transport.release();
+ transportFactory.releaseTransport(transport);
}
private VersionedOperationResponse returnVersionedOperationResponse(Transport transport, long messageId, byte response, Flag[] flags) {
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -12,7 +12,7 @@
public void writeByte(short toWrite);
- public void writeVInt(int length);
+ public void writeVInt(int vint);
public void writeVLong(long l);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/TransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -12,10 +12,14 @@
public static final String CONF_HOTROD_SERVERS = "hotrod-servers";
+ public static final String CONF_TCP_CONNECTION_POOL = "tcp-connection-pool";
+
public static final String OVERRIDE_HOTROD_SERVERS = "infinispan.hotrod-client.servers-default";
public Transport getTransport();
+ public void releaseTransport(Transport transport);
+
void init(Properties props);
void destroy();
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -4,7 +4,11 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
+import java.util.Set;
import java.util.StringTokenizer;
/**
@@ -17,8 +21,7 @@
private static Log log = LogFactory.getLog(AbstractTransportFactory.class);
- protected String serverHost;
- protected int serverPort;
+ protected Set<InetSocketAddress> serverAddresses = new HashSet<InetSocketAddress>();
public void init(Properties props) {
String servers = props.getProperty(CONF_HOTROD_SERVERS);
@@ -32,10 +35,16 @@
log.info("'hotrod-servers' property not specified in config, using " + servers);
}
StringTokenizer tokenizer = new StringTokenizer(servers, ";");
- String server = tokenizer.nextToken();
- String[] serverDef = tokenizeServer(server);
- serverHost = serverDef[0];
- serverPort = Integer.parseInt(serverDef[1]);
+ while (tokenizer.hasMoreTokens()) {
+ String server = tokenizer.nextToken();
+ String[] serverDef = tokenizeServer(server);
+ String serverHost = serverDef[0];
+ int serverPort = Integer.parseInt(serverDef[1]);
+ serverAddresses.add(new InetSocketAddress(serverHost, serverPort));
+ }
+ if (serverAddresses.isEmpty()) {
+ throw new IllegalStateException("No hot-rod servers specified!");
+ }
}
private String[] tokenizeServer(String server) {
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/VHelper.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -1,6 +1,8 @@
package org.infinispan.client.hotrod.impl.transport;
+import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.io.UnsignedNumeric;
+import org.infinispan.util.Util;
import java.io.IOException;
import java.io.InputStream;
@@ -45,4 +47,12 @@
throw new TransportException(e);
}
}
+
+ public static Object newInstance(String clazz) {
+ try {
+ return Util.getInstance(clazz);
+ } catch (Exception e) {
+ throw new HotRodClientException("Could not instantiate class: " + clazz, e);
+ }
+ }
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -23,7 +23,7 @@
@Override
public void init(Properties props) {
super.init(props);
- serverAddr = new InetSocketAddress(serverHost, serverPort);
+ serverAddr = super.serverAddresses.iterator().next();
}
@Override
@@ -36,4 +36,9 @@
public void destroy() {
// TODO: Customise this generated block
}
+
+ @Override
+ public void releaseTransport(Transport transport) {
+ transport.release();
+ }
}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/DefaultTcpConnectionPool.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,107 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.apache.commons.pool.impl.GenericObjectPoolFactory;
+import org.infinispan.client.hotrod.impl.transport.TransportException;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ *
+ * todo - all methods but init and start can be called from multiple threads, add proper sync
+ */
+public class DefaultTcpConnectionPool implements TcpConnectionPool {
+
+ private static Log log = LogFactory.getLog(DefaultTcpConnectionPool.class);
+
+ private GenericKeyedObjectPool connectionPool;
+ private PropsKeyedObjectPoolFactory poolFactory;
+ private RequestBalancingStrategy balancer;
+ private Collection<InetSocketAddress> servers;
+
+ @Override
+ public void init(Properties props) {
+ String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
+ balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
+ poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
+ }
+
+ @Override
+ public void start(Collection<InetSocketAddress> servers) {
+ connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
+ balancer.setServers(servers);
+ this.servers = servers;
+ }
+
+ @Override
+ public Socket getConnection() {
+ InetSocketAddress server = balancer.nextServer();
+ try {
+ return (Socket) connectionPool.borrowObject(server);
+ } catch (Exception e) {
+ String message = "Could not fetch connection";
+ log.error(message, e);
+ throw new TransportException(message, e);
+ }
+ }
+
+ @Override
+ public void releaseConnection(Socket socket) {
+ SocketAddress remoteAddress = socket.getRemoteSocketAddress();
+ if (!servers.contains(remoteAddress)) throw new IllegalStateException(remoteAddress.toString());
+ try {
+ connectionPool.returnObject(remoteAddress, socket);
+ } catch (Exception e) {
+ log.warn("Could not release connection",e);
+ }
+ }
+
+ @Override
+ public void updateServers(Collection<InetSocketAddress> newServers) {
+ if (newServers.containsAll(servers) && servers.containsAll(newServers)) {
+ log.info("Same list of servers, not changing the pool");
+ return;
+ }
+ for (InetSocketAddress server : newServers) {
+ if (!servers.contains(server)) {
+ log.info("New server added(" + server + "), adding to the pool.");
+ try {
+ connectionPool.addObject(server);
+ } catch (Exception e) {
+ log.warn("Failed adding server " + server, e);
+ }
+ }
+ }
+ for (InetSocketAddress server : servers) {
+ if (!newServers.contains(server)) {
+ log.info("Server not in cluster anymore(" + server + "), removing from the pool.");
+ connectionPool.clear(server);
+ }
+ }
+ servers.clear();
+ servers.addAll(newServers);
+ }
+
+ @Override
+ public void destroy() {
+ connectionPool.clear();
+ try {
+ connectionPool.close();
+ } catch (Exception e) {
+ log.warn("Exception while shutting down the connection pool.", e);
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/PropsKeyedObjectPoolFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,70 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool.impl.GenericKeyedObjectPoolFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class PropsKeyedObjectPoolFactory extends GenericKeyedObjectPoolFactory {
+
+
+ private static Log log = LogFactory.getLog(PropsKeyedObjectPoolFactory.class);
+
+ public PropsKeyedObjectPoolFactory(KeyedPoolableObjectFactory factory, Properties props) {
+ super(factory);
+ _maxActive = intProp(props, "maxActive", 1);
+ _maxTotal = intProp(props, "maxTotal", -1);
+ _maxIdle = intProp(props, "maxIdle", 4);
+ int value = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK;
+ _whenExhaustedAction = (byte) intProp(props, "whenExhaustedAction", value);
+ _testOnBorrow = booleanProp(props, "testOnBorrow", false);
+ _testOnReturn = booleanProp(props, "testOnReturn", false);
+ _timeBetweenEvictionRunsMillis = intProp(props, "timeBetweenEvictionRunsMillis", -1);
+ _minEvictableIdleTimeMillis = longProp(props, "minEvictableIdleTimeMillis", 1800000L);
+ _testWhileIdle = booleanProp(props, "testWhileIdle", false);
+ _minIdle = intProp(props, "minIdle", 0);
+ _lifo = booleanProp(props, "lifo", true);
+ }
+
+ private int intProp(Properties p, String name, int defaultValue) {
+ return (Integer) getValue(p, name, defaultValue);
+ }
+
+ private boolean booleanProp(Properties p, String name, Boolean defaultValue) {
+ return (Boolean) getValue(p, name, defaultValue);
+ }
+
+ private long longProp(Properties p, String name, long defaultValue) {
+ return (Long) getValue(p, name, defaultValue);
+ }
+
+ public Object getValue(Properties p, String name, Object defaultValue) {
+ Object propValue = p.get(name);
+ if (propValue == null) {
+ if (log.isTraceEnabled()) {
+ log.trace(name + " property not specified, using default value(" + defaultValue + ")");
+ }
+ return defaultValue;
+ } else {
+ log.trace(name + " = " + propValue);
+ if (defaultValue instanceof Integer) {
+ return Integer.parseInt(propValue.toString());
+ } else if (defaultValue instanceof Boolean) {
+ return Boolean.parseBoolean(propValue.toString());
+ } else if (defaultValue instanceof Long) {
+ return Long.parseLong(propValue.toString());
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RequestBalancingStrategy.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,17 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface RequestBalancingStrategy {
+
+ void setServers(Collection<InetSocketAddress> servers);
+
+ InetSocketAddress nextServer();
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,30 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * // TODO: Document this
+ *
+ * todo - this can be called from several threads, synchronize!
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class RoundRobinBalancingStrategy implements RequestBalancingStrategy {
+
+ private InetSocketAddress[] servers;
+ private AtomicInteger index = new AtomicInteger();
+
+ @Override
+ public void setServers(Collection<InetSocketAddress> servers) {
+ this.servers = servers.toArray(new InetSocketAddress[servers.size()]);
+ }
+
+ @Override
+ public InetSocketAddress nextServer() {
+ int pos = index.incrementAndGet() % servers.length;
+ return servers[pos];
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,61 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TcpConnectionFactory extends BaseKeyedPoolableObjectFactory {
+
+ private static Log log = LogFactory.getLog(TcpConnectionFactory.class);
+
+ @Override
+ public Object makeObject(Object key) throws Exception {
+ InetSocketAddress serverAddress = (InetSocketAddress) key;
+ if (log.isTraceEnabled()) {
+ log.trace("Creating connection to server: " + serverAddress);
+ }
+ try {
+ SocketChannel socketChannel = SocketChannel.open(serverAddress);
+ return socketChannel.socket();
+ } catch (IOException e) {
+ log.warn("Could not create connection to " + serverAddress, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean validateObject(Object key, Object obj) {
+ Socket socket = (Socket) obj;
+ if (log.isTraceEnabled()) {
+ log.trace("About to validate(ping) connection to server " + key + ". socket is " + socket);
+ }
+ //todo implement
+ return true;
+ }
+
+ @Override
+ public void destroyObject(Object key, Object obj) throws Exception {
+ Socket socket = (Socket) obj;
+ if (log.isTraceEnabled()) {
+ log.trace("About to destroy socket " + socket);
+ }
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warn("Issues closing the socket: " + socket, e);
+ }
+ }
+}
Added: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionPool.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -0,0 +1,28 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Properties;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public interface
+ TcpConnectionPool {
+
+ public void init(Properties props);
+
+ public void start(Collection<InetSocketAddress> servers);
+
+ public Socket getConnection();
+
+ public void releaseConnection(Socket socket);
+
+ public void updateServers(Collection<InetSocketAddress> newServers);
+
+ public void destroy();
+}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransport.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -22,13 +22,14 @@
private static Log log = LogFactory.getLog(TcpTransport.class);
- private String host;
- private int port;
private Socket socket;
- public void writeVInt(int length) {
+ public void writeVInt(int vInt) {
try {
- VHelper.writeVInt(length, socket.getOutputStream());
+ VHelper.writeVInt(vInt, socket.getOutputStream());
+ if (log.isTraceEnabled())
+ log.trace("VInt wrote " + vInt);
+
} catch (IOException e) {
throw new TransportException(e);
}
@@ -37,6 +38,9 @@
public void writeVLong(long l) {
try {
VHelper.writeVLong(l, socket.getOutputStream());
+ if (log.isTraceEnabled())
+ log.trace("VLong wrote " + l);
+
} catch (IOException e) {
throw new TransportException(e);
}
@@ -44,7 +48,10 @@
public long readVLong() {
try {
- return VHelper.readVLong(socket.getInputStream());
+ long result = VHelper.readVLong(socket.getInputStream());
+ if (log.isTraceEnabled())
+ log.trace("VLong read " + result);
+ return result;
} catch (IOException e) {
throw new TransportException(e);
}
@@ -52,29 +59,24 @@
public int readVInt() {
try {
- return VHelper.readVInt(socket.getInputStream());
+ int result = VHelper.readVInt(socket.getInputStream());
+ if (log.isTraceEnabled())
+ log.trace("VInt read " + result);
+ return result;
} catch (IOException e) {
throw new TransportException(e);
}
}
- public TcpTransport(String host, int port) {
- this.host = host;
- this.port = port;
+ public TcpTransport(Socket socket) {
+ this.socket = socket;
}
- public void connect() {
- try {
- SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(host, port));
- socket = socketChannel.socket();
- } catch (IOException e) {
- throw new TransportException("Problems establishing initial connection", e);
- }
- }
-
protected void writeBytes(byte[] toAppend) {
try {
socket.getOutputStream().write(toAppend);
+ if (log.isTraceEnabled())
+ log.trace("Wrote " + toAppend.length + " bytes");
} catch (IOException e) {
throw new TransportException("Problems writing data to stream", e);
}
@@ -84,6 +86,9 @@
public void writeByte(short toWrite) {
try {
socket.getOutputStream().write(toWrite);
+ if (log.isTraceEnabled())
+ log.trace("Wrote byte " + toWrite);
+
} catch (IOException e) {
throw new TransportException("Problems writing data to stream", e);
}
@@ -92,6 +97,9 @@
public void flush() {
try {
socket.getOutputStream().flush();
+ if (log.isTraceEnabled())
+ log.trace("Flushed socket: " + socket);
+
} catch (IOException e) {
throw new TransportException(e);
}
@@ -143,6 +151,13 @@
if (offset > result.length) throw new IllegalStateException("Assertion!");
}
} while (!done);
+ if (log.isTraceEnabled()) {
+ log.trace("Successfully read array with size: " + size);
+ }
return result;
}
+
+ public Socket getSocket() {
+ return socket;
+ }
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-15 14:48:24 UTC (rev 1693)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-15 22:32:54 UTC (rev 1694)
@@ -3,9 +3,11 @@
import org.infinispan.client.hotrod.impl.Transport;
import org.infinispan.client.hotrod.impl.TransportFactory;
import org.infinispan.client.hotrod.impl.transport.AbstractTransportFactory;
+import org.infinispan.client.hotrod.impl.transport.VHelper;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.net.InetSocketAddress;
import java.util.Properties;
import java.util.StringTokenizer;
@@ -19,16 +21,36 @@
private static Log log = LogFactory.getLog(TcpTransportFactory.class);
+ private TcpConnectionPool connectionPool;
+
@Override
+ public void init(Properties props) {
+ super.init(props);
+ String tcpConnectionPool = props.getProperty(CONF_TCP_CONNECTION_POOL);
+ if (tcpConnectionPool == null) {
+ tcpConnectionPool = DefaultTcpConnectionPool.class.getName();
+ log.trace("No tcp connection pools specified, using the default: " + tcpConnectionPool);
+ }
+ connectionPool = (TcpConnectionPool) VHelper.newInstance(tcpConnectionPool);
+ connectionPool.init(props);
+ connectionPool.start(serverAddresses);
+ }
+
+ @Override
public void destroy() {
- // TODO: Customise this generated block
+ if (connectionPool != null) {
+ connectionPool.destroy();
+ }
}
@Override
public Transport getTransport() {
- log.info("Connecting to server on: " + serverHost + ":" + serverPort);
- TcpTransport transport = new TcpTransport(serverHost, serverPort);
- transport.connect();
- return transport;
+ return new TcpTransport(connectionPool.getConnection());
}
+
+ @Override
+ public void releaseTransport(Transport transport) {
+ TcpTransport tcpTransport = (TcpTransport) transport;
+ connectionPool.releaseConnection(tcpTransport.getSocket());
+ }
}
More information about the infinispan-commits
mailing list