[infinispan-commits] Infinispan SVN: r1920 - in branches/4.1.x/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Jun 16 21:46:02 EDT 2010
Author: mircea.markus
Date: 2010-06-16 21:46:01 -0400 (Wed, 16 Jun 2010)
New Revision: 1920
Added:
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
Removed:
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
Modified:
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
Log:
ISPN-488 - implemented for replication
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -2,9 +2,9 @@
import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
-import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
import org.infinispan.client.hotrod.impl.transport.Transport;
import org.infinispan.client.hotrod.impl.transport.TransportFactory;
import org.infinispan.util.logging.Log;
@@ -34,114 +34,147 @@
}
public byte[] get(byte[] key, Flag[] flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ try {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ }
+ if (status == NO_ERROR_STATUS) {
+ return transport.readArray();
+ }
+ } finally {
+ releaseTransport(transport);
+ }
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
}
- if (status == NO_ERROR_STATUS) {
- return transport.readArray();
- }
- } finally {
- releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] remove(byte[] key, Flag[] flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- } else if (status == NO_ERROR_STATUS) {
- return returnPossiblePrevValue(transport, flags);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
+ } else if (status == NO_ERROR_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ }
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ }finally {
+ releaseTransport(transport);
}
- } finally {
- releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public boolean containsKey(byte[] key, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return false;
- } else if (status == NO_ERROR_STATUS) {
- return true;
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return false;
+ } else if (status == NO_ERROR_STATUS) {
+ return true;
+ }
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
}
- } finally {
- releaseTransport(transport);
+ finally {
+ releaseTransport(transport);
+ }
}
throw new IllegalStateException("We should not reach here!");
}
public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
- if (status == KEY_DOES_NOT_EXIST_STATUS) {
- return null;
- }
- if (status == NO_ERROR_STATUS) {
- long version = transport.readLong();
- if (log.isTraceEnabled()) {
- log.trace("Received version: " + version);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
+ if (status == KEY_DOES_NOT_EXIST_STATUS) {
+ return null;
}
- byte[] value = transport.readArray();
- return new BinaryVersionedValue(version, value);
+ if (status == NO_ERROR_STATUS) {
+ long version = transport.readLong();
+ if (log.isTraceEnabled()) {
+ log.trace("Received version: " + version);
+ }
+ byte[] value = transport.readArray();
+ return new BinaryVersionedValue(version, value);
+ }
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ } finally {
+ releaseTransport(transport);
}
- } finally {
- releaseTransport(transport);
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
- if (status != NO_ERROR_STATUS) {
- throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+ if (status != NO_ERROR_STATUS) {
+ throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+ }
+ return returnPossiblePrevValue(transport, flags);
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ }finally {
+ releaseTransport(transport);
}
- return returnPossiblePrevValue(transport, flags);
- } finally {
- releaseTransport(transport);
}
+ throw new IllegalStateException("This should not be reached!");
}
public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- byte[] bytes = returnPossiblePrevValue(transport, flags);
- if (log.isTraceEnabled()) {
- log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ byte[] bytes = returnPossiblePrevValue(transport, flags);
+ if (log.isTraceEnabled()) {
+ log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+ }
+ return bytes;
}
- return bytes;
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
}
- } finally {
- releaseTransport(transport);
+ finally {
+ releaseTransport(transport);
+ }
}
throw new IllegalStateException("We should not reach here!");
}
public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
- if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
- return returnPossiblePrevValue(transport, flags);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
+ if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+ return returnPossiblePrevValue(transport, flags);
+ }
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ } finally {
+ releaseTransport(transport);
}
- } finally {
- releaseTransport(transport);
}
- throw new IllegalStateException("We should not reach here!");
+ throw new IllegalStateException(" should not reach here!");
}
/**
@@ -151,52 +184,68 @@
* was sent, the response would be empty.
*/
public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
- //2) write message body
- transport.writeArray(key);
- transport.writeVInt(lifespan);
- transport.writeVInt(maxIdle);
- transport.writeLong(version);
- transport.writeArray(value);
- return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
- } finally {
- releaseTransport(transport);
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeVInt(lifespan);
+ transport.writeVInt(maxIdle);
+ transport.writeLong(version);
+ transport.writeArray(value);
+ return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ }
+ finally {
+ releaseTransport(transport);
+ }
}
+ throw new IllegalStateException(" should not reach here!");
}
/**
* Request: [header][key length][key][entry_version]
*/
public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
- Transport transport = transportFactory.getTransport(key);
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport(key);
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
- //2) write message body
- transport.writeArray(key);
- transport.writeLong(version);
+ //2) write message body
+ transport.writeArray(key);
+ transport.writeLong(version);
- //process response and return
- return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
+ //process response and return
+ return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
- } finally {
- releaseTransport(transport);
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ }
+ finally {
+ releaseTransport(transport);
+ }
}
+ throw new IllegalStateException("Should not reach this point!");
}
public void clear(Flag... flags) {
- Transport transport = transportFactory.getTransport();
- try {
- // 1) write header
- long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
- HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
- } finally {
- releaseTransport(transport);
+ for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+ Transport transport = transportFactory.getTransport();
+ try {
+ // 1) write header
+ long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
+ HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
+ } catch (TransportException te) {
+ logErrorAndThrowExceptionIfNeeded(i, te);
+ } finally {
+ releaseTransport(transport);
+ }
}
}
@@ -295,4 +344,9 @@
byte[] prevValue = returnPossiblePrevValue(transport, flags);
return new VersionedOperationResponse(prevValue, code);
}
+
+ private void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
+ log.warn("Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount(), te);
+ if (i == transportFactory.getTransportCount() - 1) throw te;
+ }
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -32,4 +32,6 @@
Transport getTransport(byte[] key);
boolean isTcpNoDelay();
+
+ int getTransportCount();
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -66,4 +66,9 @@
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
+
+ @Override
+ public int getTransportCount() {
+ return 1;
+ }
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -51,14 +51,34 @@
public InetSocketAddress nextServer() {
readLock.lock();
try {
- int pos = index.getAndIncrement() % servers.length;
- InetSocketAddress server = servers[pos];
- if (log.isTraceEnabled()) {
- log.trace("Returning server: " + server);
- }
+ InetSocketAddress server = getServerByIndex(index.getAndIncrement());
return server;
} finally {
readLock.unlock();
}
}
+
+ /**
+ * Returns same value as {@link #nextServer()} without modifying indexes/state.
+ */
+ public InetSocketAddress dryRunNextServer() {
+ return getServerByIndex(index.get());
+ }
+
+ private InetSocketAddress getServerByIndex(int val) {
+ int pos = val % servers.length;
+ InetSocketAddress server = servers[pos];
+ if (log.isTraceEnabled()) {
+ log.trace("Returning server: " + server);
+ }
+ return server;
+ }
+
+ public InetSocketAddress[] getServers() {
+ return servers;
+ }
+
+ public int getNextPosition() {
+ return index.get() % servers.length;
+ }
}
Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -181,4 +181,17 @@
public boolean isTcpNoDelay() {
return tcpNoDelay;
}
+
+ @Override
+ public int getTransportCount() {
+ if (connectionPool.getMaxActive() > 0) {
+ return connectionPool.getMaxActive() * servers.size();
+ } else {
+ return 10 * servers.size();
+ }
+ }
+
+ public RequestBalancingStrategy getBalancer() {
+ return balancer;
+ }
}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -6,13 +6,9 @@
import org.infinispan.config.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.DistributionManager;
-import org.infinispan.interceptors.CacheMgmtInterceptor;
-import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.manager.CacheContainer;
-import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.ByteArrayKey;
import org.infinispan.util.logging.Log;
@@ -25,9 +21,7 @@
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.Random;
@@ -39,7 +33,7 @@
* @since 4.1
*/
@Test(groups = "functional", testName = "client.hotrod.CSAIntegrationTest")
-public class CSAIntegrationTest extends MultipleCacheManagersTest {
+public class CSAIntegrationTest extends HitsAwareCacheManagersTest {
private HotRodServer hotRodServer1;
private HotRodServer hotRodServer2;
@@ -47,8 +41,6 @@
private RemoteCacheManager remoteCacheManager;
private RemoteCache<Object, Object> remoteCache;
private TcpTransportFactory tcpConnectionFactory;
- private static final String CACHE_NAME = "distributedCache";
- Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
private static Log log = LogFactory.getLog(CSAIntegrationTest.class);
@@ -62,12 +54,9 @@
Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
config.setNumOwners(1);
config.setUnsafeUnreliableReturnValues(true);
- EmbeddedCacheManager cm1 = addClusterEnabledCacheManager();
- EmbeddedCacheManager cm2 = addClusterEnabledCacheManager();
- EmbeddedCacheManager cm3 = addClusterEnabledCacheManager();
- cm1.defineConfiguration(CACHE_NAME, config);
- cm2.defineConfiguration(CACHE_NAME, config);
- cm3.defineConfiguration(CACHE_NAME, config);
+ addClusterEnabledCacheManager(config);
+ addClusterEnabledCacheManager(config);
+ addClusterEnabledCacheManager(config);
hotRodServer1 = TestHelper.startHotRodServer(manager(0));
hrServ2CacheManager.put(new InetSocketAddress(hotRodServer1.getHost(), hotRodServer1.getPort()), manager(0));
@@ -76,19 +65,19 @@
hotRodServer3 = TestHelper.startHotRodServer(manager(2));
hrServ2CacheManager.put(new InetSocketAddress(hotRodServer3.getHost(), hotRodServer3.getPort()), manager(2));
- assert manager(0).getCache(CACHE_NAME) != null;
- assert manager(1).getCache(CACHE_NAME) != null;
- assert manager(2).getCache(CACHE_NAME) != null;
+ assert manager(0).getCache() != null;
+ assert manager(1).getCache() != null;
+ assert manager(2).getCache() != null;
TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
- manager(0).getCache(CACHE_NAME).put("k", "v");
- manager(0).getCache(CACHE_NAME).get("k").equals("v");
- manager(1).getCache(CACHE_NAME).get("k").equals("v");
- manager(2).getCache(CACHE_NAME).get("k").equals("v");
+ manager(0).getCache().put("k", "v");
+ manager(0).getCache().get("k").equals("v");
+ manager(1).getCache().get("k").equals("v");
+ manager(2).getCache().get("k").equals("v");
log.info("Local replication test passed!");
@@ -96,7 +85,7 @@
Properties props = new Properties();
props.put("hotrod-servers", "localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
remoteCacheManager = new RemoteCacheManager(props);
- remoteCache = remoteCacheManager.getCache(CACHE_NAME);
+ remoteCache = remoteCacheManager.getCache();
tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
}
@@ -121,18 +110,13 @@
@Test(dependsOnMethods = "testCorrectSetup")
public void testHashFunctionReturnsSameValues() {
- Map<InetSocketAddress, CacheContainer> add2Cm = new HashMap<InetSocketAddress, CacheContainer>();
- add2Cm.put(new InetSocketAddress(hotRodServer1.getHost(), hotRodServer1.getPort()), manager(0));
- add2Cm.put(new InetSocketAddress(hotRodServer2.getHost(), hotRodServer2.getPort()), manager(1));
- add2Cm.put(new InetSocketAddress(hotRodServer3.getHost(), hotRodServer3.getPort()), manager(2));
-
for (int i = 0; i < 1000; i++) {
byte[] key = generateKey(i);
TcpTransport transport = (TcpTransport) tcpConnectionFactory.getTransport(key);
InetSocketAddress serverAddress = transport.getServerAddress();
- CacheContainer cacheContainer = add2Cm.get(serverAddress);
- assertNotNull("For server address " + serverAddress + " found " + cacheContainer + ". Map is: " + add2Cm, cacheContainer);
- DistributionManager distributionManager = cacheContainer.getCache(CACHE_NAME).getAdvancedCache().getDistributionManager();
+ CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
+ assertNotNull("For server address " + serverAddress + " found " + cacheContainer + ". Map is: " + hrServ2CacheManager, cacheContainer);
+ DistributionManager distributionManager = cacheContainer.getCache().getAdvancedCache().getDistributionManager();
assert distributionManager.isLocal(key);
tcpConnectionFactory.releaseTransport(transport);
}
@@ -140,11 +124,7 @@
@Test(dependsOnMethods = "testHashFunctionReturnsSameValues")
public void testRequestsGoToExpectedServer() throws Exception {
-
- addCacheMgmtInterceptor(manager(0).getCache(CACHE_NAME));
- addCacheMgmtInterceptor(manager(1).getCache(CACHE_NAME));
- addCacheMgmtInterceptor(manager(2).getCache(CACHE_NAME));
-
+ addInterceptors();
List<byte[]> keys = new ArrayList<byte[]>();
for (int i = 0; i < 500; i++) {
byte[] key = generateKey(i);
@@ -157,8 +137,6 @@
tcpConnectionFactory.releaseTransport(transport);
}
- assertMisses(false);
-
log.info("Right before first get.");
for (byte[] key : keys) {
@@ -169,36 +147,12 @@
TcpTransport transport = (TcpTransport) tcpConnectionFactory.getTransport(keyBytes);
assertOnlyServerHit(transport.getServerAddress());
tcpConnectionFactory.releaseTransport(transport);
- assertMisses(false);
}
- assertMisses(false);
-
- remoteCache.get("noSuchKey");
- assertMisses(true);
}
- private void assertOnlyServerHit(InetSocketAddress serverAddress) {
- CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
- CacheMgmtInterceptor interceptor = getCacheMgmtInterceptor(cacheContainer.getCache(CACHE_NAME));
- assert interceptor.getHits() == 1;
- for (CacheContainer cm : hrServ2CacheManager.values()) {
- if (cm != cacheContainer) {
- interceptor = getCacheMgmtInterceptor(cm.getCache(CACHE_NAME));
- assert interceptor.getHits() == 0;
- }
- }
- }
-
- private void resetStats() {
- for (int i = 0; i< 3; i++) {
- CacheMgmtInterceptor cmi = getCacheMgmtInterceptor(manager(i).getCache(CACHE_NAME));
- cmi.resetStatistics();
- }
- }
-
private void assertCacheContainsKey(InetSocketAddress serverAddress, byte[] keyBytes) {
CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
- Cache<Object, Object> cache = cacheContainer.getCache(CACHE_NAME);
+ Cache<Object, Object> cache = cacheContainer.getCache();
DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
assert dataContainer.keySet().contains(new ByteArrayKey(keyBytes));
}
@@ -210,39 +164,6 @@
return byteArrayOutputStream.toByteArray();
}
- private void addCacheMgmtInterceptor(Cache<Object, Object> cache) {
- CacheMgmtInterceptor interceptor = new CacheMgmtInterceptor();
- cache.getAdvancedCache().addInterceptor(interceptor, 1);
- }
-
- private void assertMisses(boolean expected) {
- int misses = getMissCount(manager(0).getCache(CACHE_NAME));
- misses += getMissCount(manager(1).getCache(CACHE_NAME));
- misses += getMissCount(manager(2).getCache(CACHE_NAME));
-
- if (expected) {
- assert misses > 0;
- } else {
- assertEquals(0, misses);
- }
- }
-
- private int getMissCount(Cache<Object, Object> cache) {
- CacheMgmtInterceptor cacheMgmtInterceptor = getCacheMgmtInterceptor(cache);
- return (int) cacheMgmtInterceptor.getMisses();
- }
-
- private CacheMgmtInterceptor getCacheMgmtInterceptor(Cache<Object, Object> cache) {
- CacheMgmtInterceptor cacheMgmtInterceptor = null;
- List<CommandInterceptor> interceptorChain = cache.getAdvancedCache().getInterceptorChain();
- for (CommandInterceptor interceptor : interceptorChain) {
- if (interceptor instanceof CacheMgmtInterceptor) {
- cacheMgmtInterceptor = (CacheMgmtInterceptor) interceptor;
- }
- }
- return cacheMgmtInterceptor;
- }
-
private byte[] generateKey(int i) {
Random r = new Random();
byte[] result = new byte[i];
Deleted: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -1,15 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.config.Configuration;
-import org.testng.annotations.Test;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test(groups = "functional" , testName = "client.hotrod.DistTopologyChangeTest")
-public class DistTopologyChange extends ReplTopologyChangeTest {
- protected Configuration.CacheMode getCacheMode() {
- return Configuration.CacheMode.DIST_SYNC;
- }
-}
Copied: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java (from rev 1901, branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java)
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,30 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional" , testName = "client.hotrod.DistTopologyChangeTest")
+public class DistTopologyChangeTest extends ReplTopologyChangeTest {
+ protected Configuration.CacheMode getCacheMode() {
+ return Configuration.CacheMode.DIST_SYNC;
+ }
+
+ @Override
+ protected void waitForClusterToForm(int memberCount) {
+ super.waitForClusterToForm(memberCount);
+ List<Cache> caches = new ArrayList<Cache>();
+ for (int i = 0; i < memberCount; i++) {
+ caches.add(manager(i).getCache());
+ }
+ BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(caches);
+ }
+}
Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,136 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.CacheMgmtInterceptor;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.BeforeMethod;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class HitsAwareCacheManagersTest extends MultipleCacheManagersTest {
+
+ Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
+ Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
+
+ @BeforeMethod
+ public void createBeforeMethod() throws Throwable {
+ if (cleanup == CleanupPhase.AFTER_METHOD) {
+ hrServ2CacheManager.clear();
+ addr2hrServer.clear();
+ }
+ super.createBeforeMethod();
+ }
+
+ protected HitCountInterceptor getHitCountInterceptor(Cache<Object, Object> cache) {
+ HitCountInterceptor hitCountInterceptor = null;
+ List<CommandInterceptor> interceptorChain = cache.getAdvancedCache().getInterceptorChain();
+ for (CommandInterceptor interceptor : interceptorChain) {
+ boolean isHitCountInterceptor = interceptor instanceof HitCountInterceptor;
+ if (hitCountInterceptor != null && isHitCountInterceptor) {
+ throw new IllegalStateException("Two HitCountInterceptors! " + interceptorChain);
+ }
+ if (isHitCountInterceptor) {
+ hitCountInterceptor = (HitCountInterceptor) interceptor;
+ }
+ }
+ return hitCountInterceptor;
+ }
+
+ protected void assertOnlyServerHit(InetSocketAddress serverAddress) {
+ CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
+ HitCountInterceptor interceptor = getHitCountInterceptor(cacheContainer.getCache());
+ assert interceptor.getHits() == 1 : "Expected one hit but received " + interceptor.getHits();
+ for (CacheContainer cm : hrServ2CacheManager.values()) {
+ if (cm != cacheContainer) {
+ interceptor = getHitCountInterceptor(cm.getCache());
+ assert interceptor.getHits() == 0 : "Expected 0 hits but got " + interceptor.getHits();
+ }
+ }
+ }
+
+ protected void assertNoHits() {
+ for (CacheContainer cm : hrServ2CacheManager.values()) {
+ HitCountInterceptor interceptor = getHitCountInterceptor(cm.getCache());
+ assert interceptor.getHits() == 0 : "Expected 0 hits but got " + interceptor.getHits();
+ }
+ }
+
+ protected InetSocketAddress getAddress(HotRodServer hotRodServer) {
+ InetSocketAddress socketAddress = new InetSocketAddress(hotRodServer.getHost(), hotRodServer.getPort());
+ addr2hrServer.put(socketAddress, hotRodServer);
+ return socketAddress;
+ }
+
+ protected void resetStats() {
+ for (EmbeddedCacheManager manager : cacheManagers) {
+ HitCountInterceptor cmi = getHitCountInterceptor(manager.getCache());
+ cmi.reset();
+ }
+ }
+
+ protected void addInterceptors() {
+ for (EmbeddedCacheManager manager : cacheManagers) {
+ addHitCountInterceptor(manager.getCache());
+ }
+ }
+
+ private void addHitCountInterceptor(Cache<Object, Object> cache) {
+ InetSocketAddress addr;
+ addr = getHotRodServerAddress(cache);
+ HitCountInterceptor interceptor = new HitCountInterceptor(addr);
+ cache.getAdvancedCache().addInterceptor(interceptor, 1);
+ }
+
+ private InetSocketAddress getHotRodServerAddress(Cache<Object, Object> cache) {
+ InetSocketAddress addr = null;
+ for (Map.Entry<InetSocketAddress, CacheContainer> entry : hrServ2CacheManager.entrySet()) {
+ if (entry.getValue().equals(cache.getCacheManager())) {
+ addr = entry.getKey();
+ }
+ }
+ return addr;
+ }
+
+ /**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ public static class HitCountInterceptor extends CommandInterceptor{
+
+ private volatile int invocationCount;
+ private volatile InetSocketAddress addr;
+
+ public HitCountInterceptor(InetSocketAddress addr) {
+ this.addr = addr;
+ }
+
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+ if (ctx.isOriginLocal()) {
+ invocationCount ++;
+ }
+ return super.handleDefault(ctx, command);
+ }
+
+ public int getHits() {
+ return invocationCount;
+ }
+
+ public void reset() {
+ invocationCount = 0;
+ }
+ }
+}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -54,13 +54,11 @@
manager(0).getCache();
manager(1).getCache();
- TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+ waitForClusterToForm(2);
- manager(0).getCache().put("k", "v");
- manager(0).getCache().get("k").equals("v");
- manager(1).getCache().get("k").equals("v");
+ manager(0).getCache().put("k_test", "v");
+ manager(0).getCache().get("k_test").equals("v");
+ manager(1).getCache().get("k_test").equals("v");
log.info("Local replication test passed!");
@@ -89,10 +87,7 @@
hotRodServer3 = TestHelper.startHotRodServer(manager(2));
manager(2).getCache();
- TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+ waitForClusterToForm(3);
try {
expectTopologyChange(new InetSocketAddress("localhost", hotRodServer3.getPort()), true);
@@ -108,11 +103,11 @@
public void testDropServer() {
hotRodServer3.stop();
manager(2).stop();
- TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
- TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
- TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
- InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+ waitForClusterToForm(2);
+
+ InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+
try {
expectTopologyChange(server3Address, false);
assertEquals(2, tcpConnectionFactory.getServers().size());
@@ -125,15 +120,16 @@
private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
for (int i = 0; i < 10; i++) {
- try {
- remoteCache.put("k" + i, "v" + i);
- } catch (Exception e) {
- if (added) {
- throw new IllegalStateException(e);
- } //else it is acceptable, as the transport hasn't changed
- }
+ remoteCache.put("k" + i, "v" + i);
if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
}
assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
}
+
+ protected void waitForClusterToForm(int memberCount) {
+ TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 10000);
+ for (int i = 0; i < memberCount; i++) {
+ TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 10000);
+ }
+ }
}
Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,189 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
+public class ReplicationRetryTest extends HitsAwareCacheManagersTest {
+ HotRodServer hotRodServer1;
+ HotRodServer hotRodServer2;
+ HotRodServer hotRodServer3;
+
+ RemoteCache remoteCache;
+ private RemoteCacheManager remoteCacheManager;
+ private TcpTransportFactory tcpConnectionFactory;
+ private Configuration config;
+ private RoundRobinBalancingStrategy strategy;
+
+ public ReplicationRetryTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected void assertSupportedConfig() {
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+
+ assert cleanup == CleanupPhase.AFTER_METHOD;
+
+
+ config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+ CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+ registerCacheManager(cm1);
+ registerCacheManager(cm2);
+ registerCacheManager(cm3);
+
+ hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+ hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
+ hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+ hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
+ hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+ hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
+
+ manager(0).getCache();
+ manager(1).getCache();
+ manager(2).getCache();
+
+ TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+ TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+ Properties clientConfig = new Properties();
+ clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
+ clientConfig.put("force-return-value", "true");
+ clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
+
+ remoteCacheManager = new RemoteCacheManager(clientConfig);
+ remoteCache = remoteCacheManager.getCache();
+ tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+ strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
+ addInterceptors();
+
+ assert super.cacheManagers.size() == 3;
+
+ }
+
+
+ public void testGet() {
+ validateSequenceAndStopServer();
+ //now make sure that next call won't fail
+ resetStats();
+ for (int i = 0; i < 100; i++) {
+ assert remoteCache.get("k").equals("v");
+ }
+ }
+
+ public void testPut() {
+
+ validateSequenceAndStopServer();
+ resetStats();
+
+ assert "v".equals(remoteCache.put("k", "v0"));
+ for (int i = 1; i < 100; i++) {
+ assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
+ }
+ }
+
+ public void testRemove() {
+ validateSequenceAndStopServer();
+ resetStats();
+
+ assertEquals("v", remoteCache.remove("k"));
+ }
+
+ public void testContains() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(true, remoteCache.containsKey("k"));
+ }
+
+ public void testGetWithVersion() {
+ validateSequenceAndStopServer();
+ resetStats();
+ VersionedValue value = remoteCache.getVersioned("k");
+ assertEquals("v", value.getValue());
+ }
+
+ public void testPutIfAbsent() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+ assertEquals("someValue", remoteCache.get("noSuchKey"));
+ }
+
+ public void testReplace() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals("v", remoteCache.replace("k", "v2"));
+ }
+
+ public void testReplaceIfUnmodified() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
+ }
+
+ public void testRemoveIfUnmodified() {
+ validateSequenceAndStopServer();
+ resetStats();
+ assertEquals(false, remoteCache.removeWithVersion("k", 12));
+ }
+
+ public void testClear() {
+ validateSequenceAndStopServer();
+ resetStats();
+ remoteCache.clear();
+ assertEquals(false, remoteCache.containsKey("k"));
+ }
+
+ private void validateSequenceAndStopServer() {
+ resetStats();
+ assertNoHits();
+ InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ assertNoHits();
+ remoteCache.put("k","v");
+
+ assert strategy.getServers().length == 3;
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k2","v2");
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k3","v3");
+ assertOnlyServerHit(expectedServer);
+
+ resetStats();
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ remoteCache.put("k","v");
+ assertOnlyServerHit(expectedServer);
+
+ //this would be the next server to be shutdown
+ expectedServer = strategy.getServers()[strategy.getNextPosition()];
+ HotRodServer toStop = addr2hrServer.get(expectedServer);
+ toStop.stop();
+ }
+}
Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java 2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java 2010-06-17 01:46:01 UTC (rev 1920)
@@ -169,9 +169,8 @@
remoteCache.put("k6", "v2");
remoteCache.put("k7", "v3");
remoteCache.put("k8", "v4");
- assert false : "exception expected as balancer is still redirecting to failed node";
- } catch (TransportException e) {
- //expected
+ } catch (Exception e) {
+ assert false : "exception should not happen even if the balancer redirects to failed node at the beggining";
}
}
More information about the infinispan-commits
mailing list