[infinispan-commits] Infinispan SVN: r1703 - in trunk/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport/tcp and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Apr 16 18:32:45 EDT 2010
Author: mircea.markus
Date: 2010-04-16 18:32:44 -0400 (Fri, 16 Apr 2010)
New Revision: 1703
Added:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
Removed:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStarter.java
Modified:
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
trunk/client/hotrod-client/src/test/resources/log4j.xml
Log:
smarte client
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/AbstractTransport.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -30,11 +30,12 @@
@Override
public long readLong() {
+ //todo - optimize this not to create the longBytes on every call, but reuse it/cache it as class is NOT thread safe
byte[] longBytes = readByteArray(8);
long result = 0;
- for (int i = 0; i < 8; i++) {
+ for (byte longByte : longBytes) {
result <<= 8;
- result ^= (long) longBytes[i] & 0xFF;
+ result ^= (long) longByte & 0xFF;
}
return result;
}
@@ -48,6 +49,17 @@
writeBytes(b);
}
+ @Override
+ public int readUnsignedShort() {
+ byte[] shortBytes = readByteArray(2);
+ int result = 0;
+ for (byte longByte : shortBytes) {
+ result <<= 8;
+ result ^= (long) longByte & 0xFF;
+ }
+ return result;
+ }
+
public void writeArray(byte[] toAppend) {
writeVInt(toAppend.length);
writeBytes(toAppend);
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/HotrodOperationsImpl.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -8,8 +8,12 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -24,8 +28,9 @@
private final byte[] cacheNameBytes;
private static final AtomicLong MSG_ID = new AtomicLong();
+ private static final AtomicInteger TOPOLOGY_ID = new AtomicInteger();
private TransportFactory transportFactory;
- private byte clientIntelligence;
+ private byte clientIntelligence = CLIENT_INTELLIGENCE_TOPOLOGY_AWARE;
public HotrodOperationsImpl(String cacheName, TransportFactory transportFactory) {
cacheNameBytes = cacheName.getBytes(); //todo add charset here
@@ -276,7 +281,7 @@
}
transport.writeVInt(flagInt);
transport.writeByte(clientIntelligence);
- transport.writeVInt(0);//this will be changed once smarter clients are supported
+ transport.writeVInt(TOPOLOGY_ID.get());
if (log.isTraceEnabled()) {
log.trace("wrote header for message " + messageId + ". Operation code: " + operationCode + ". Flags: " + Integer.toHexString(flagInt));
}
@@ -314,8 +319,23 @@
log.trace("Received operation code is: " + receivedOpCode);
}
short status = transport.readByte();
- transport.readByte(); //todo - this is the topology change status, and it will be changed once we support smarter than basic clients
checkForErrorsInResponseStatus(status, messageId, transport);
+ short topologyChangeByte = transport.readByte();
+ if (topologyChangeByte == 1) {
+ int newTopology = transport.readVInt();
+ TOPOLOGY_ID.set(newTopology);
+ int clusterSize = transport.readVInt();
+ List<InetSocketAddress> hotRodServers = new ArrayList<InetSocketAddress>(clusterSize);
+ for (int i = 0; i < clusterSize; i++) {
+ String host = transport.readString();
+ int port = transport.readUnsignedShort();
+ hotRodServers.add(new InetSocketAddress(host, port));
+ }
+ if (log.isInfoEnabled()) {
+ log.info("Received topology change response. New cluster size = " + clusterSize +", new topology id = " + newTopology + ", new topology " + hotRodServers);
+ }
+ transportFactory.updateServers(hotRodServers);
+ }
return status;
}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/Transport.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -1,11 +1,14 @@
package org.infinispan.client.hotrod.impl;
+import net.jcip.annotations.NotThreadSafe;
+
/**
* // TODO: Document this
*
* @author mmarkus
* @since 4.1
*/
+ at NotThreadSafe
public interface Transport {
public void writeArray(byte[] toAppend);
@@ -38,4 +41,6 @@
long readLong();
void writeLong(long longValue);
+
+ int readUnsignedShort();
}
Deleted: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -1,44 +0,0 @@
-package org.infinispan.client.hotrod.impl.transport.tcp;
-
-import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-
-import java.net.InetSocketAddress;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class TcpConnectionFactory extends BaseKeyedPoolableObjectFactory {
-
- private static Log log = LogFactory.getLog(TcpConnectionFactory.class);
-
- @Override
- public Object makeObject(Object key) throws Exception {
- InetSocketAddress serverAddress = (InetSocketAddress) key;
- TcpTransport tcpTransport = new TcpTransport(serverAddress);
- if (log.isTraceEnabled()) {
- log.trace("Created tcp transport: " + tcpTransport);
- }
- return tcpTransport;
- }
-
- @Override
- public boolean validateObject(Object key, Object obj) {
- TcpTransport transport = (TcpTransport) obj;
- if (log.isTraceEnabled()) {
- log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
- }
- //todo implement
- return true;
- }
-
- @Override
- public void destroyObject(Object key, Object obj) throws Exception {
- TcpTransport transport = (TcpTransport) obj;
- transport.destroy();
- }
-}
Modified: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -39,7 +39,7 @@
servers = staticConfiguredServers;
String balancerClass = props.getProperty("requestBalancingStrategy", RoundRobinBalancingStrategy.class.getName());
balancer = (RequestBalancingStrategy) VHelper.newInstance(balancerClass);
- PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TcpConnectionFactory(), props);
+ PropsKeyedObjectPoolFactory poolFactory = new PropsKeyedObjectPoolFactory(new TransportObjectFactory(), props);
connectionPool = (GenericKeyedObjectPool) poolFactory.createPool();
balancer.setServers(servers);
}
@@ -63,6 +63,8 @@
String message = "Could not fetch transport";
log.error(message, e);
throw new TransportException(message, e);
+ } finally {
+ logConnectionInfo(server);
}
}
@@ -73,6 +75,8 @@
connectionPool.returnObject(tcpTransport.getServerAddress(), tcpTransport);
} catch (Exception e) {
log.warn("Could not release connection: " + tcpTransport, e);
+ } finally {
+ logConnectionInfo(tcpTransport.getServerAddress());
}
}
@@ -83,19 +87,25 @@
addedServers.removeAll(servers);
Set<InetSocketAddress> failedServers = new HashSet<InetSocketAddress>(servers);
failedServers.removeAll(newServers);
- if (failedServers.isEmpty() || newServers.isEmpty()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Current list: " + servers);
+ log.trace("New list: " + newServers);
+ log.trace("Added servers: " + addedServers);
+ log.trace("Removed servers: " + failedServers);
+ }
+ if (failedServers.isEmpty() && newServers.isEmpty()) {
log.info("Same list of servers, not changing the pool");
return;
}
//1. first add new servers. For servers that went down, the returned transport will fail for now
for (InetSocketAddress server : newServers) {
+ log.info("New server added(" + server + "), adding to the pool.");
try {
connectionPool.addObject(server);
} catch (Exception e) {
log.warn("Failed adding new server " + server, e);
}
- log.info("New server added(" + server + "), adding to the pool.");
}
//2. now set the server list to the active list of servers. All the active servers (potentially together with some
@@ -111,7 +121,17 @@
}
servers.clear();
- servers.addAll(addedServers);
+ servers.addAll(newServers);
}
}
+
+ public Collection<InetSocketAddress> getServers() {
+ return servers;
+ }
+
+ private void logConnectionInfo(InetSocketAddress server) {
+ if (log.isTraceEnabled()) {
+ log.trace("For server " + server + ": active = " + connectionPool.getNumActive(server) + "; idle = " + connectionPool.getNumIdle(server));
+ }
+ }
}
Copied: trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java (from rev 1699, trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpConnectionFactory.java)
===================================================================
--- trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java (rev 0)
+++ trunk/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TransportObjectFactory.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -0,0 +1,60 @@
+package org.infinispan.client.hotrod.impl.transport.tcp;
+
+import org.apache.commons.pool.BaseKeyedPoolableObjectFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.net.InetSocketAddress;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TransportObjectFactory extends BaseKeyedPoolableObjectFactory {
+
+ private static Log log = LogFactory.getLog(TransportObjectFactory.class);
+
+ @Override
+ public Object makeObject(Object key) throws Exception {
+ InetSocketAddress serverAddress = (InetSocketAddress) key;
+ TcpTransport tcpTransport = new TcpTransport(serverAddress);
+ if (log.isTraceEnabled()) {
+ log.trace("Created tcp transport: " + tcpTransport);
+ }
+ return tcpTransport;
+ }
+
+ @Override
+ public boolean validateObject(Object key, Object obj) {
+ TcpTransport transport = (TcpTransport) obj;
+ if (log.isTraceEnabled()) {
+ log.trace("About to validate(ping) connection to server " + key + ". TcpTransport is " + transport);
+ }
+ //todo implement
+ return true;
+ }
+
+ @Override
+ public void destroyObject(Object key, Object obj) throws Exception {
+ TcpTransport transport = (TcpTransport) obj;
+ transport.destroy();
+ }
+
+ @Override
+ public void activateObject(Object key, Object obj) throws Exception {
+ super.activateObject(key, obj);
+ if (log.isTraceEnabled()) {
+ log.trace("Fetching from pool:" + obj);
+ }
+ }
+
+ @Override
+ public void passivateObject(Object key, Object obj) throws Exception {
+ super.passivateObject(key, obj);
+ if (log.isTraceEnabled()) {
+ log.trace("Returning to pool:" + obj);
+ }
+ }
+}
Modified: trunk/client/hotrod-client/src/main/resources/hotrod-client.properties
===================================================================
--- trunk/client/hotrod-client/src/main/resources/hotrod-client.properties 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/main/resources/hotrod-client.properties 2010-04-16 22:32:44 UTC (rev 1703)
@@ -39,10 +39,8 @@
##Optionally, one may configure the pool to examine and possibly evict connections as they sit idle in the pool and to
#ensure that a minimum number of idle connections is maintained for each server. This is performed by an "idle object eviction" thread,
-#which runs asynchronously. Caution should be used when configuring this optional feature. Eviction runs require an
-#exclusive synchronization lock on the pool, so if they run too frequently and / or incur excessive latency when creating,
-#destroying or validating connection instances, performance issues may result. The idle connection eviction thread may be
-#configured using the following attributes:
+#which runs asynchronously. As of pool 1.5, the idle object evictor does not lock the pool throughout its execution.
+# The idle connection eviction thread may be configured using the following attributes:
#timeBetweenEvictionRunsMillis indicates how long the eviction thread should sleep before "runs" of examining idle connections. When non-positive,
#no eviction thread will be launched. The default setting for this parameter is -1 (i.e., by default, idle connections eviction is disabled).
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ClientConnectionPoolingTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -1,25 +1,248 @@
package org.infinispan.client.hotrod;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.infinispan.Cache;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.server.hotrod.HotRodServer;
import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.Test;
+import java.net.InetSocketAddress;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.testng.AssertJUnit.assertEquals;
+
/**
* // TODO: Document this
*
* @author Mircea.Markus at jboss.com
* @since 4.1
*/
+ at Test(testName = "client.hotrod.ClientConnectionPoolingTest")
public class ClientConnectionPoolingTest extends MultipleCacheManagersTest {
+ private static final Log log = LogFactory.getLog(ClientConnectionPoolingTest.class);
+ Cache c1;
+ Cache c2;
+ private HotRodServer hotRodServer1;
+ private HotRodServer hotRodServer2;
+
+ RemoteCache<String, String> remoteCache;
+ private RemoteCacheManager remoteCacheManager;
+ private GenericKeyedObjectPool connectionPool;
+ private InetSocketAddress hrServ1Addr;
+ private InetSocketAddress hrServ2Addr;
+
+
+ private WorkerThread workerThread1;
+ private WorkerThread workerThread2;
+ private WorkerThread workerThread3;
+ private WorkerThread workerThread4;
+ private WorkerThread workerThread5;
+ private WorkerThread workerThread6;
+
+
@Override
+ protected void assertSupportedConfig() {
+ return;
+ }
+
+ @Override
protected void createCacheManagers() throws Throwable {
- // TODO: Customise this generated block
+ c1 = TestCacheManagerFactory.createLocalCacheManager().getCache();
+ c2 = TestCacheManagerFactory.createLocalCacheManager().getCache();
+ registerCacheManager(c1.getCacheManager(), c2.getCacheManager());
+
+ hotRodServer1 = TestHelper.startHotRodServer(c1.getCacheManager());
+ hotRodServer2 = TestHelper.startHotRodServer(c2.getCacheManager());
+
+ String servers = TestHelper.getServersString(hotRodServer1, hotRodServer2);
+ Properties hotrodClientConf = new Properties();
+ hotrodClientConf.put(RemoteCacheManager.CONF_HOTROD_SERVERS, servers);
+ hotrodClientConf.put("maxActive", 2);
+ hotrodClientConf.put("maxTotal", 8);
+ hotrodClientConf.put("maxIdle", 6);
+ hotrodClientConf.put("whenExhaustedAction", 1);
+ hotrodClientConf.put("testOnBorrow", "false");
+ hotrodClientConf.put("testOnReturn", "false");
+ hotrodClientConf.put("timeBetweenEvictionRunsMillis", "-2");
+ hotrodClientConf.put("minEvictableIdleTimeMillis", "7");
+ hotrodClientConf.put("testWhileIdle", "true");
+ hotrodClientConf.put("minIdle", "-5");
+ hotrodClientConf.put("lifo", "true");
+
+ remoteCacheManager = new RemoteCacheManager(hotrodClientConf);
+ remoteCache = remoteCacheManager.getCache();
+
+ TcpTransportFactory tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+ connectionPool = (GenericKeyedObjectPool) TestingUtil.extractField(tcpConnectionFactory, "connectionPool");
+ workerThread1 = new WorkerThread(remoteCache);
+ workerThread2 = new WorkerThread(remoteCache);
+ workerThread3 = new WorkerThread(remoteCache);
+ workerThread4 = new WorkerThread(remoteCache);
+ workerThread5 = new WorkerThread(remoteCache);
+ workerThread6 = new WorkerThread(remoteCache);
+
+ hrServ1Addr = new InetSocketAddress("localhost", hotRodServer1.getPort());
+ hrServ2Addr = new InetSocketAddress("localhost", hotRodServer2.getPort());
}
- /**
- * What happens if a server goes down and after that, we try to create a connection to that server.
- */
- public void testServerGoesDown() {
+ @AfterTest(alwaysRun = true)
+ public void tearDown() {
+ hotRodServer1.stop();
+ hotRodServer2.stop();
+ workerThread1.stopThread();
+ workerThread2.stopThread();
+ workerThread3.stopThread();
+ workerThread4.stopThread();
+ workerThread5.stopThread();
+ workerThread6.stopThread();
+ }
+ public void testPropsCorrectlySet() {
+ assertEquals(2, connectionPool.getMaxActive());
+ assertEquals(8, connectionPool.getMaxTotal());
+ assertEquals(6, connectionPool.getMaxIdle());
+ assertEquals(1, connectionPool.getWhenExhaustedAction());
+ assertEquals(false, connectionPool.getTestOnBorrow());
+ assertEquals(false, connectionPool.getTestOnReturn());
+ assertEquals(-2, connectionPool.getTimeBetweenEvictionRunsMillis());
+ assertEquals(7, connectionPool.getMinEvictableIdleTimeMillis());
+ assertEquals(true, connectionPool.getTestWhileIdle());
+ assertEquals(-5, connectionPool.getMinIdle());
+ assertEquals(true, connectionPool.getLifo());
}
+
+ public void testMaxActiveReached() throws Exception {
+ workerThread1.put("k1", "v1");
+ workerThread1.put("k2", "v2");
+
+
+ assertEquals(1, c1.size());
+ assertEquals(1, c2.size());
+
+ assertEquals("v1", remoteCache.get("k1"));
+ assertEquals(1, c1.size());
+ assertEquals("v2", remoteCache.get("k2"));
+ assertEquals(1, c2.size());
+
+ assertEquals(1, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(1, connectionPool.getNumIdle(hrServ2Addr));
+
+ DelayTransportInterceptor dt1 = new DelayTransportInterceptor(true);
+ DelayTransportInterceptor dt2 = new DelayTransportInterceptor(true);
+ c1.getAdvancedCache().addInterceptor(dt1, 0);
+ c2.getAdvancedCache().addInterceptor(dt2, 0);
+ log.info("Interceptors added");
+
+ workerThread1.putAsync("k3", "v3");
+ workerThread2.putAsync("k4", "v4");
+ log.info("Async calls for k3 and k4 is done.");
+ for (int i = 0; i < 10; i++) {
+ log.trace("Active for server " + hrServ1Addr + " are:" + connectionPool.getNumActive(hrServ1Addr));
+ log.trace("Active for server " + hrServ2Addr + " are:" + connectionPool.getNumActive(hrServ2Addr));
+ if (connectionPool.getNumActive(hrServ1Addr) == 1 && connectionPool.getNumActive(hrServ1Addr) == 1) break;
+ Thread.sleep(1000);
+ }
+ log.info("Connection pool is " + connectionPool);
+ assertEquals(1, connectionPool.getNumActive(hrServ1Addr));
+ assertEquals(1, connectionPool.getNumActive(hrServ2Addr));
+ assertEquals(0, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(0, connectionPool.getNumIdle(hrServ2Addr));
+
+ workerThread3.putAsync("k5", "v5");
+ workerThread4.putAsync("k6", "v6");
+ for (int i = 0; i < 10; i++) {
+ log.trace("Active for server " + hrServ1Addr + " are:" + connectionPool.getNumActive(hrServ1Addr));
+ log.trace("Active for server " + hrServ2Addr + " are:" + connectionPool.getNumActive(hrServ2Addr));
+ if (connectionPool.getNumActive(hrServ1Addr) == 2 && connectionPool.getNumActive(hrServ1Addr) == 2) break;
+ Thread.sleep(1000);
+ }
+ assertEquals(0, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(0, connectionPool.getNumIdle(hrServ2Addr));
+
+ workerThread5.putAsync("k7", "v7");
+ workerThread6.putAsync("k8", "v8");
+ assertEquals(2, connectionPool.getNumActive(hrServ1Addr));
+ assertEquals(2, connectionPool.getNumActive(hrServ2Addr));
+ assertEquals(0, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(0, connectionPool.getNumIdle(hrServ2Addr));
+
+ //now allow
+ dt1.allow();
+ dt2.allow();
+
+ assertExistKeyValue("k3", "v3");
+ assertExistKeyValue("k4", "v4");
+ assertExistKeyValue("k5", "v5");
+ assertExistKeyValue("k6", "v6");
+ assertExistKeyValue("k7", "v7");
+ assertExistKeyValue("k8", "v8");
+
+ assertEquals(0, connectionPool.getNumActive(hrServ1Addr));
+ assertEquals(0, connectionPool.getNumActive(hrServ2Addr));
+
+ assertEquals(2, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(2, connectionPool.getNumIdle(hrServ2Addr));
+
+
+ assertEquals(2, connectionPool.getNumIdle(hrServ1Addr));
+ assertEquals(2, connectionPool.getNumIdle(hrServ2Addr));
+ }
+
+ private void assertExistKeyValue(String key, String value) throws InterruptedException {
+ boolean exists = false;
+ for (int i = 0; i < 10; i++) {
+ exists = value.equals(remoteCache.get(key)) || value.equals(remoteCache.get(key));
+ if (exists) break;
+ Thread.sleep(1000);
+ }
+ assertEquals("key value not found: (" + key + ", " + value + ")", true, exists);
+ }
+
+ public static class DelayTransportInterceptor extends CommandInterceptor {
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public DelayTransportInterceptor(boolean lock) {
+ if (lock)
+ block();
+ }
+
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+ log.trace("Acquiring lock. " + lockInfo());
+ lock.lock();
+ try {
+ return super.handleDefault(ctx, command);
+ } finally {
+ log.trace("Done operation, releasing lock" + lockInfo());
+ lock.unlock();
+ }
+ }
+
+ private String lockInfo() {
+ return " Is locked? " + lock.isLocked() + ". Lock held by me? " + lock.isHeldByCurrentThread();
+ }
+
+ public void block() {
+ log.trace("block. " + lockInfo());
+ lock.lock();
+ }
+
+ public void allow() {
+ log.trace("allow." + lockInfo());
+ lock.unlock();
+ }
+ }
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ForceReturnValueTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -26,7 +26,7 @@
protected CacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createLocalCacheManager();
cache = cacheManager.getCache();
- hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ hotrodServer = TestHelper.startHotRodServer(cacheManager);
remoteCacheManager = new RemoteCacheManager("localhost",hotrodServer.getPort());
remoteCache = remoteCacheManager.getCache();
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodIntegrationTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -7,7 +7,6 @@
import org.infinispan.server.core.CacheValue;
import org.infinispan.server.hotrod.CacheKey;
import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
@@ -54,7 +53,7 @@
//pass the config file to the cache
- hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ hotrodServer = TestHelper.startHotRodServer(cacheManager);
log.info("Started server on port: " + hotrodServer.getPort());
remoteCacheManager = getRemoteCacheManager();
Deleted: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStarter.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStarter.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStarter.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -1,25 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.manager.CacheManager;
-import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.server.hotrod.test.HotRodTestingUtil;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * // TODO: Document this
- *
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
-public class HotRodServerStarter {
-
- /**
- * This needs to be different than the one used in the server tests in order to make sure that there's no clash.
- */
- private static final AtomicInteger uniquePort = new AtomicInteger(11312);
-
- public static HotRodServer startHotRodServer(CacheManager cacheManager) {
- return HotRodTestingUtil.startHotRodServer(cacheManager, uniquePort.incrementAndGet());
- }
-}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotrodStatisticsTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -32,7 +32,7 @@
protected void setup() throws Exception {
cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(getClass().getSimpleName());
- hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ hotrodServer = TestHelper.startHotRodServer(cacheManager);
startTime = System.currentTimeMillis();
rcm = new RemoteCacheManager("localhost", hotrodServer.getPort());
remoteCache = rcm.getCache();
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RemoteCacheManagerTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -26,7 +26,7 @@
@Override
protected CacheManager createCacheManager() throws Exception {
cacheManager = TestCacheManagerFactory.createLocalCacheManager();
- hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ hotrodServer = TestHelper.startHotRodServer(cacheManager);
prevValue = System.setProperty(RemoteCacheManager.OVERRIDE_HOTROD_SERVERS, "localhost:" + hotrodServer.getPort());
return cacheManager;
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -33,13 +33,15 @@
Cache c1;
Cache c2;
Cache c3;
- private HotRodServer hotRodServer1;
- private HotRodServer hotRodServer2;
- private HotRodServer hotRodServer3;
+ Cache c4;
+ HotRodServer hotRodServer1;
+ HotRodServer hotRodServer2;
+
+ HotRodServer hotRodServer3;
+ HotRodServer hotRodServer4;
+
RemoteCache<String, String> remoteCache;
- private Cache c4;
- private HotRodServer hotRodServer4;
@Override
protected void assertSupportedConfig() {
@@ -53,14 +55,14 @@
c3 = TestCacheManagerFactory.createLocalCacheManager().getCache();
registerCacheManager(c1.getCacheManager(), c2.getCacheManager(), c3.getCacheManager());
- hotRodServer1 = HotRodServerStarter.startHotRodServer(c1.getCacheManager());
- hotRodServer2 = HotRodServerStarter.startHotRodServer(c2.getCacheManager());
- hotRodServer3 = HotRodServerStarter.startHotRodServer(c3.getCacheManager());
+ hotRodServer1 = TestHelper.startHotRodServer(c1.getCacheManager());
+ hotRodServer2 = TestHelper.startHotRodServer(c2.getCacheManager());
+ hotRodServer3 = TestHelper.startHotRodServer(c3.getCacheManager());
log.trace("Server 1 port: " + hotRodServer1.getPort());
log.trace("Server 2 port: " + hotRodServer2.getPort());
log.trace("Server 3 port: " + hotRodServer3.getPort());
- String servers = getServersString(hotRodServer1, hotRodServer2, hotRodServer3);
+ String servers = TestHelper.getServersString(hotRodServer1, hotRodServer2, hotRodServer3);
log.trace("Server list is: " + servers);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager(servers);
remoteCache = remoteCacheManager.getCache();
@@ -72,12 +74,6 @@
hotRodServer2.stop();
hotRodServer3.stop();
hotRodServer4.stop();
-
-
- c1.getCacheManager().stop();
- c2.getCacheManager().stop();
- c3.getCacheManager().stop();
- c4.getCacheManager().stop();
}
public void testRoundRubinLoadBalancing() {
@@ -108,7 +104,7 @@
@Test(dependsOnMethods = "testRoundRubinLoadBalancing")
public void testAddNewHotrodServer() {
c4 = TestCacheManagerFactory.createLocalCacheManager().getCache();
- hotRodServer4 = HotRodServerStarter.startHotRodServer(c4.getCacheManager());
+ hotRodServer4 = TestHelper.startHotRodServer(c4.getCacheManager());
registerCacheManager(c4.getCacheManager());
List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
@@ -225,11 +221,4 @@
return (RoundRobinBalancingStrategy) TestingUtil.extractField(transportFactory, "balancer");
}
- private String getServersString(HotRodServer... servers) {
- StringBuilder builder = new StringBuilder();
- for (HotRodServer server : servers) {
- builder.append("localhost").append(':').append(server.getPort()).append(";");
- }
- return builder.toString();
- }
}
Modified: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ServerShutdownTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -5,8 +5,6 @@
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Test;
-import java.util.Properties;
-
import static junit.framework.Assert.assertEquals;
/**
@@ -20,7 +18,7 @@
public void testServerShutdownWithConnectedClient() {
CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
- HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ HotRodServer hotrodServer = TestHelper.startHotRodServer(cacheManager);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
RemoteCache remoteCache = remoteCacheManager.getCache();
@@ -34,7 +32,7 @@
public void testServerShutdownWithoutConnectedClient() {
CacheManager cacheManager = TestCacheManagerFactory.createLocalCacheManager();
- HotRodServer hotrodServer = HotRodServerStarter.startHotRodServer(cacheManager);
+ HotRodServer hotrodServer = TestHelper.startHotRodServer(cacheManager);
RemoteCacheManager remoteCacheManager = new RemoteCacheManager("localhost", hotrodServer.getPort());
RemoteCache remoteCache = remoteCacheManager.getCache();
Copied: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java (from rev 1701, trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HotRodServerStarter.java)
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TestHelper.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -0,0 +1,33 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.manager.CacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.server.hotrod.test.HotRodTestingUtil;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class TestHelper {
+
+ /**
+ * This needs to be different than the one used in the server tests in order to make sure that there's no clash.
+ */
+ private static final AtomicInteger uniquePort = new AtomicInteger(11312);
+
+ public static HotRodServer startHotRodServer(CacheManager cacheManager) {
+ return HotRodTestingUtil.startHotRodServer(cacheManager, uniquePort.incrementAndGet());
+ }
+
+ public static String getServersString(HotRodServer... servers) {
+ StringBuilder builder = new StringBuilder();
+ for (HotRodServer server : servers) {
+ builder.append("localhost").append(':').append(server.getPort()).append(";");
+ }
+ return builder.toString();
+ }
+}
Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/TopologyChangeTest.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -0,0 +1,88 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.MultipleCacheManagersTest", groups = "functional")
+public class TopologyChangeTest extends MultipleCacheManagersTest {
+
+ HotRodServer hotRodServer1;
+ HotRodServer hotRodServer2;
+ HotRodServer hotRodServer3;
+
+ RemoteCache remoteCache;
+ private RemoteCacheManager remoteCacheManager;
+ private TcpTransportFactory tcpConnectionFactory;
+
+ @Override
+ protected void assertSupportedConfig() {
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ addClusterEnabledCacheManager();
+ addClusterEnabledCacheManager();
+ hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+ hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+
+ //Important: this only connects to one of the two servers!
+ remoteCacheManager = new RemoteCacheManager("localhost", hotRodServer2.getPort());
+ remoteCache = remoteCacheManager.getCache();
+
+ tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+ }
+
+
+ public void testTwoMembers() {
+ InetSocketAddress server1Address = new InetSocketAddress("localhost", hotRodServer1.getPort());
+ expectTopologyChange(server1Address, true);
+ assertEquals(2, tcpConnectionFactory.getServers().size());
+ }
+
+ @Test(dependsOnMethods = "testTwoMembers")
+ public void testAddNewServer() {
+ addClusterEnabledCacheManager();
+ manager(2).getCache();
+ TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
+ hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+ expectTopologyChange(new InetSocketAddress("localhost",hotRodServer3.getPort()), true);
+ assertEquals(3, tcpConnectionFactory.getServers().size());
+ }
+
+ @Test(dependsOnMethods = "testAddNewServer")
+ public void testDropServer() {
+ manager(2).stop();
+ TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
+ InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+ hotRodServer3.stop();
+ expectTopologyChange(server3Address, false);
+ assertEquals(2, tcpConnectionFactory.getServers().size());
+ }
+
+ private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
+ for (int i = 0; i < 10; i++) {
+ try {
+ remoteCache.put("k" + i, "v" + i);
+ } catch (Exception e) {
+ if (added) {
+ throw new IllegalStateException(e);
+ } //else it is acceptable, as the transport hasn't changed
+ }
+ if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
+ }
+ assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
+ }
+}
Added: trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java
===================================================================
--- trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java (rev 0)
+++ trunk/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/WorkerThread.java 2010-04-16 22:32:44 UTC (rev 1703)
@@ -0,0 +1,115 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public class WorkerThread extends Thread {
+
+ private static final AtomicInteger WORKER_INDEX = new AtomicInteger();
+
+ private static Log log = LogFactory.getLog(WorkerThread.class);
+
+ public static final String NULL = "_null_";
+ public static final String PUT_SYNC = "_put_sync_";
+ public static final String PUT_ASYNC = "_put_async_";
+
+ final RemoteCache<String, String> remoteCache;
+ final BlockingQueue send = new ArrayBlockingQueue(1);
+ final BlockingQueue receive = new ArrayBlockingQueue(1);
+
+ volatile String key;
+ volatile String value;
+
+ public WorkerThread(RemoteCache<String, String> remoteCache) {
+ super("WorkerThread-" + WORKER_INDEX.getAndDecrement());
+ this.remoteCache = remoteCache;
+ start();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Object o = send.take();
+ trace("Took from queue: " + o);
+ if (o instanceof Integer) {
+ receive.put(1);
+ trace("exiting!");
+ return;
+ }
+ Object result = remoteCache.put(key, value);
+ trace("Added to the cache (" + key + "," + value + ") and returning " + result);
+ if (PUT_SYNC.equals(o)) {
+ receive.put(result == null ? NULL : result);
+ trace("Que now has: " + receive.peek());
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ /**
+ * Only returns when this thread added the given key value.
+ */
+ public String put(String key, String value) {
+ this.key = key;
+ this.value = value;
+ try {
+ trace("::put::send contains: " + send.peek());
+ send.put(PUT_SYNC);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ try {
+ String result = (String) receive.take();
+ trace("::put::took out of receive: " + result);
+ return result == NULL ? null : result;
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Only returns when this thread added the given key value.
+ */
+ public void putAsync(String key, String value) {
+ this.key = key;
+ this.value = value;
+ try {
+ trace("::put::send contains: " + send.peek());
+ send.put(PUT_ASYNC);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Only returns when this thread is stopped.
+ */
+ public void stopThread() {
+ try {
+ send.put(new Integer(1));
+ Object o = receive.take();
+ trace("::stopThread::took out of receive: " + o);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void trace(String message) {
+ log.trace("Worker: " + message);
+ }
+}
Modified: trunk/client/hotrod-client/src/test/resources/log4j.xml
===================================================================
--- trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-16 14:31:31 UTC (rev 1702)
+++ trunk/client/hotrod-client/src/test/resources/log4j.xml 2010-04-16 22:32:44 UTC (rev 1703)
@@ -56,6 +56,10 @@
<priority value="TRACE"/>
</category>
+ <category name="org.infinispan.client.hotrod.impl.transport.tcp.TcpTransport">
+ <priority value="WARN"/>
+ </category>
+
<category name="com.mchange">
<priority value="TRACE"/>
</category>
More information about the infinispan-commits
mailing list