[infinispan-commits] Infinispan SVN: r1920 - in branches/4.1.x/client/hotrod-client/src: main/java/org/infinispan/client/hotrod/impl/transport and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Jun 16 21:46:02 EDT 2010


Author: mircea.markus
Date: 2010-06-16 21:46:01 -0400 (Wed, 16 Jun 2010)
New Revision: 1920

Added:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
Removed:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
Modified:
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
Log:
ISPN-488 - implemented for replication

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -2,9 +2,9 @@
 
 import org.infinispan.client.hotrod.Flag;
 import org.infinispan.client.hotrod.exceptions.InvalidResponseException;
+import org.infinispan.client.hotrod.exceptions.TransportException;
 import org.infinispan.client.hotrod.impl.BinaryVersionedValue;
 import org.infinispan.client.hotrod.impl.VersionedOperationResponse;
-import org.infinispan.client.hotrod.impl.protocol.HotRodConstants;
 import org.infinispan.client.hotrod.impl.transport.Transport;
 import org.infinispan.client.hotrod.impl.transport.TransportFactory;
 import org.infinispan.util.logging.Log;
@@ -34,114 +34,147 @@
    }
 
    public byte[] get(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         try {
+            Transport transport = transportFactory.getTransport(key);
+            try {
+               short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
+               if (status == KEY_DOES_NOT_EXIST_STATUS) {
+                  return null;
+               }
+               if (status == NO_ERROR_STATUS) {
+                  return transport.readArray();
+               }
+            } finally {
+               releaseTransport(transport);
+            }
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
          }
-         if (status == NO_ERROR_STATUS) {
-            return transport.readArray();
-         }
-      } finally {
-         releaseTransport(transport);
       }
       throw new IllegalStateException("We should not reach here!");
    }
 
    public byte[] remove(byte[] key, Flag[] flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
-         } else if (status == NO_ERROR_STATUS) {
-            return returnPossiblePrevValue(transport, flags);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
+            if (status == KEY_DOES_NOT_EXIST_STATUS) {
+               return null;
+            } else if (status == NO_ERROR_STATUS) {
+               return returnPossiblePrevValue(transport, flags);
+            }
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         }finally {
+            releaseTransport(transport);
          }
-      } finally {
-         releaseTransport(transport);
       }
       throw new IllegalStateException("We should not reach here!");
    }
 
    public boolean containsKey(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return false;
-         } else if (status == NO_ERROR_STATUS) {
-            return true;
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
+            if (status == KEY_DOES_NOT_EXIST_STATUS) {
+               return false;
+            } else if (status == NO_ERROR_STATUS) {
+               return true;
+            }
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
          }
-      } finally {
-         releaseTransport(transport);
+         finally {
+            releaseTransport(transport);
+         }
       }
       throw new IllegalStateException("We should not reach here!");
    }
 
    public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
-         if (status == KEY_DOES_NOT_EXIST_STATUS) {
-            return null;
-         }
-         if (status == NO_ERROR_STATUS) {
-            long version = transport.readLong();
-            if (log.isTraceEnabled()) {
-               log.trace("Received version: " + version);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
+            if (status == KEY_DOES_NOT_EXIST_STATUS) {
+               return null;
             }
-            byte[] value = transport.readArray();
-            return new BinaryVersionedValue(version, value);
+            if (status == NO_ERROR_STATUS) {
+               long version = transport.readLong();
+               if (log.isTraceEnabled()) {
+                  log.trace("Received version: " + version);
+               }
+               byte[] value = transport.readArray();
+               return new BinaryVersionedValue(version, value);
+            }
+         } catch (TransportException te) {
+           logErrorAndThrowExceptionIfNeeded(i, te); 
+         } finally {
+            releaseTransport(transport);
          }
-      } finally {
-         releaseTransport(transport);
       }
       throw new IllegalStateException("We should not reach here!");
    }
 
 
    public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
-         if (status != NO_ERROR_STATUS) {
-            throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
+            if (status != NO_ERROR_STATUS) {
+               throw new InvalidResponseException("Unexpected response status: " + Integer.toHexString(status));
+            }
+            return returnPossiblePrevValue(transport, flags);
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         }finally {
+            releaseTransport(transport);
          }
-         return returnPossiblePrevValue(transport, flags);
-      } finally {
-         releaseTransport(transport);
       }
+      throw new IllegalStateException("This should not be reached!");
    }
 
    public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
-         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            byte[] bytes = returnPossiblePrevValue(transport, flags);
-            if (log.isTraceEnabled()) {
-               log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
+            if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+               byte[] bytes = returnPossiblePrevValue(transport, flags);
+               if (log.isTraceEnabled()) {
+                  log.trace("Returning from putIfAbsent: " + Arrays.toString(bytes));
+               }
+               return bytes;
             }
-            return bytes;
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
          }
-      } finally {
-         releaseTransport(transport);
+         finally {
+            releaseTransport(transport);
+         }
       }
       throw new IllegalStateException("We should not reach here!");
    }
 
    public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
-         if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
-            return returnPossiblePrevValue(transport, flags);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
+            if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
+               return returnPossiblePrevValue(transport, flags);
+            }
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         } finally {
+            releaseTransport(transport);
          }
-      } finally {
-         releaseTransport(transport);
       }
-      throw new IllegalStateException("We should not reach here!");
+      throw new IllegalStateException(" should not reach here!");
    }
 
    /**
@@ -151,52 +184,68 @@
     * was sent, the response would be empty.
     */
    public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         // 1) write header
-         long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            // 1) write header
+            long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
 
-         //2) write message body
-         transport.writeArray(key);
-         transport.writeVInt(lifespan);
-         transport.writeVInt(maxIdle);
-         transport.writeLong(version);
-         transport.writeArray(value);
-         return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
-      } finally {
-         releaseTransport(transport);
+            //2) write message body
+            transport.writeArray(key);
+            transport.writeVInt(lifespan);
+            transport.writeVInt(maxIdle);
+            transport.writeLong(version);
+            transport.writeArray(value);
+            return returnVersionedOperationResponse(transport, messageId, REPLACE_IF_UNMODIFIED_RESPONSE, flags);
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         }
+         finally {
+            releaseTransport(transport);
+         }
       }
+      throw new IllegalStateException(" should not reach here!");
    }
 
    /**
     * Request: [header][key length][key][entry_version]
     */
    public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
-      Transport transport = transportFactory.getTransport(key);
-      try {
-         // 1) write header
-         long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport(key);
+         try {
+            // 1) write header
+            long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
 
-         //2) write message body
-         transport.writeArray(key);
-         transport.writeLong(version);
+            //2) write message body
+            transport.writeArray(key);
+            transport.writeLong(version);
 
-         //process response and return
-         return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
+            //process response and return
+            return returnVersionedOperationResponse(transport, messageId, REMOVE_IF_UNMODIFIED_RESPONSE, flags);
 
-      } finally {
-         releaseTransport(transport);
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         }
+         finally {
+            releaseTransport(transport);
+         }
       }
+      throw new IllegalStateException("Should not reach this point!");
    }
 
    public void clear(Flag... flags) {
-      Transport transport = transportFactory.getTransport();
-      try {
-         // 1) write header
-         long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
-         HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
-      } finally {
-         releaseTransport(transport);
+      for (int i = 0; i < transportFactory.getTransportCount(); i++) {
+         Transport transport = transportFactory.getTransport();
+         try {
+            // 1) write header
+            long messageId = HotRodOperationsHelper.writeHeader(transport, CLEAR_REQUEST, cacheName, topologyId, flags);
+            HotRodOperationsHelper.readHeaderAndValidate(transport, messageId, CLEAR_RESPONSE, topologyId);
+         } catch (TransportException te) {
+            logErrorAndThrowExceptionIfNeeded(i, te);
+         } finally {
+            releaseTransport(transport);
+         }
       }
    }
 
@@ -295,4 +344,9 @@
       byte[] prevValue = returnPossiblePrevValue(transport, flags);
       return new VersionedOperationResponse(prevValue, code);
    }
+
+   private void logErrorAndThrowExceptionIfNeeded(int i, TransportException te) {
+      log.warn("Transport exception. Retry " + i + " out of " + transportFactory.getTransportCount(), te);
+      if (i == transportFactory.getTransportCount() - 1) throw te;
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/TransportFactory.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -32,4 +32,6 @@
    Transport getTransport(byte[] key);
 
    boolean isTcpNoDelay();
+
+   int getTransportCount();
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/netty/NettyTransportFactory.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -66,4 +66,9 @@
    public boolean isTcpNoDelay() {
       return tcpNoDelay;
    }
+
+   @Override
+   public int getTransportCount() {
+      return 1;
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/RoundRobinBalancingStrategy.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -51,14 +51,34 @@
    public InetSocketAddress nextServer() {
       readLock.lock();
       try {
-         int pos = index.getAndIncrement() % servers.length;
-         InetSocketAddress server = servers[pos];
-         if (log.isTraceEnabled()) {
-            log.trace("Returning server: " + server);
-         }
+         InetSocketAddress server = getServerByIndex(index.getAndIncrement());
          return server;
       } finally {
          readLock.unlock();
       }
    }
+
+   /**
+    * Returns same value as {@link #nextServer()} without modifying indexes/state.
+    */
+   public InetSocketAddress dryRunNextServer() {
+      return getServerByIndex(index.get());
+   }
+
+   private InetSocketAddress getServerByIndex(int val) {
+      int pos = val % servers.length;
+      InetSocketAddress server = servers[pos];
+      if (log.isTraceEnabled()) {
+         log.trace("Returning server: " + server);
+      }
+      return server;
+   }
+
+   public InetSocketAddress[] getServers() {
+      return servers;
+   }
+
+   public int getNextPosition() {
+      return  index.get() % servers.length;
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/tcp/TcpTransportFactory.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -181,4 +181,17 @@
    public boolean isTcpNoDelay() {
       return tcpNoDelay;
    }
+
+   @Override
+   public int getTransportCount() {
+      if (connectionPool.getMaxActive() > 0) {
+         return connectionPool.getMaxActive() * servers.size();
+      } else {
+         return 10 * servers.size();
+      }
+   }
+
+   public RequestBalancingStrategy getBalancer() {
+      return balancer;
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/CSAIntegrationTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -6,13 +6,9 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.distribution.DistributionManager;
-import org.infinispan.interceptors.CacheMgmtInterceptor;
-import org.infinispan.interceptors.base.CommandInterceptor;
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.manager.CacheContainer;
-import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.MultipleCacheManagersTest;
 import org.infinispan.test.TestingUtil;
 import org.infinispan.util.ByteArrayKey;
 import org.infinispan.util.logging.Log;
@@ -25,9 +21,7 @@
 import java.io.ObjectOutputStream;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
@@ -39,7 +33,7 @@
  * @since 4.1
  */
 @Test(groups = "functional", testName = "client.hotrod.CSAIntegrationTest")
-public class CSAIntegrationTest extends MultipleCacheManagersTest {
+public class CSAIntegrationTest extends HitsAwareCacheManagersTest {
 
    private HotRodServer hotRodServer1;
    private HotRodServer hotRodServer2;
@@ -47,8 +41,6 @@
    private RemoteCacheManager remoteCacheManager;
    private RemoteCache<Object, Object> remoteCache;
    private TcpTransportFactory tcpConnectionFactory;
-   private static final String CACHE_NAME = "distributedCache";
-   Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
 
    private static Log log = LogFactory.getLog(CSAIntegrationTest.class);
 
@@ -62,12 +54,9 @@
       Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
       config.setNumOwners(1);
       config.setUnsafeUnreliableReturnValues(true);
-      EmbeddedCacheManager cm1 = addClusterEnabledCacheManager();
-      EmbeddedCacheManager cm2 = addClusterEnabledCacheManager();
-      EmbeddedCacheManager cm3 = addClusterEnabledCacheManager();
-      cm1.defineConfiguration(CACHE_NAME, config);
-      cm2.defineConfiguration(CACHE_NAME, config);
-      cm3.defineConfiguration(CACHE_NAME, config);
+      addClusterEnabledCacheManager(config);
+      addClusterEnabledCacheManager(config);
+      addClusterEnabledCacheManager(config);
 
       hotRodServer1 = TestHelper.startHotRodServer(manager(0));
       hrServ2CacheManager.put(new InetSocketAddress(hotRodServer1.getHost(), hotRodServer1.getPort()), manager(0));
@@ -76,19 +65,19 @@
       hotRodServer3 = TestHelper.startHotRodServer(manager(2));
       hrServ2CacheManager.put(new InetSocketAddress(hotRodServer3.getHost(), hotRodServer3.getPort()), manager(2));
 
-      assert manager(0).getCache(CACHE_NAME) != null;
-      assert manager(1).getCache(CACHE_NAME) != null;
-      assert manager(2).getCache(CACHE_NAME) != null;
+      assert manager(0).getCache() != null;
+      assert manager(1).getCache() != null;
+      assert manager(2).getCache() != null;
 
       TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
       TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
       TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
       TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
 
-      manager(0).getCache(CACHE_NAME).put("k", "v");
-      manager(0).getCache(CACHE_NAME).get("k").equals("v");
-      manager(1).getCache(CACHE_NAME).get("k").equals("v");
-      manager(2).getCache(CACHE_NAME).get("k").equals("v");
+      manager(0).getCache().put("k", "v");
+      manager(0).getCache().get("k").equals("v");
+      manager(1).getCache().get("k").equals("v");
+      manager(2).getCache().get("k").equals("v");
 
       log.info("Local replication test passed!");
 
@@ -96,7 +85,7 @@
       Properties props = new Properties();
       props.put("hotrod-servers", "localhost:" + hotRodServer2.getPort() + ";localhost:" + hotRodServer2.getPort());
       remoteCacheManager = new RemoteCacheManager(props);
-      remoteCache = remoteCacheManager.getCache(CACHE_NAME);
+      remoteCache = remoteCacheManager.getCache();
 
       tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
    }
@@ -121,18 +110,13 @@
 
    @Test(dependsOnMethods = "testCorrectSetup")
    public void testHashFunctionReturnsSameValues() {
-      Map<InetSocketAddress, CacheContainer> add2Cm = new HashMap<InetSocketAddress, CacheContainer>();
-      add2Cm.put(new InetSocketAddress(hotRodServer1.getHost(), hotRodServer1.getPort()), manager(0));
-      add2Cm.put(new InetSocketAddress(hotRodServer2.getHost(), hotRodServer2.getPort()), manager(1));
-      add2Cm.put(new InetSocketAddress(hotRodServer3.getHost(), hotRodServer3.getPort()), manager(2));
-
       for (int i = 0; i < 1000; i++) {
          byte[] key = generateKey(i);
          TcpTransport transport = (TcpTransport) tcpConnectionFactory.getTransport(key);
          InetSocketAddress serverAddress = transport.getServerAddress();
-         CacheContainer cacheContainer = add2Cm.get(serverAddress);
-         assertNotNull("For server address " + serverAddress + " found " + cacheContainer + ". Map is: " + add2Cm, cacheContainer);
-         DistributionManager distributionManager = cacheContainer.getCache(CACHE_NAME).getAdvancedCache().getDistributionManager();
+         CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
+         assertNotNull("For server address " + serverAddress + " found " + cacheContainer + ". Map is: " + hrServ2CacheManager, cacheContainer);
+         DistributionManager distributionManager = cacheContainer.getCache().getAdvancedCache().getDistributionManager();
          assert distributionManager.isLocal(key);
          tcpConnectionFactory.releaseTransport(transport);
       }
@@ -140,11 +124,7 @@
 
    @Test(dependsOnMethods = "testHashFunctionReturnsSameValues")
    public void testRequestsGoToExpectedServer() throws Exception {
-
-      addCacheMgmtInterceptor(manager(0).getCache(CACHE_NAME));
-      addCacheMgmtInterceptor(manager(1).getCache(CACHE_NAME));
-      addCacheMgmtInterceptor(manager(2).getCache(CACHE_NAME));
-
+      addInterceptors();
       List<byte[]> keys = new ArrayList<byte[]>();
       for (int i = 0; i < 500; i++) {
          byte[] key = generateKey(i);
@@ -157,8 +137,6 @@
          tcpConnectionFactory.releaseTransport(transport);
       }
 
-      assertMisses(false);
-
       log.info("Right before first get.");
 
       for (byte[] key : keys) {
@@ -169,36 +147,12 @@
          TcpTransport transport = (TcpTransport) tcpConnectionFactory.getTransport(keyBytes);
          assertOnlyServerHit(transport.getServerAddress());
          tcpConnectionFactory.releaseTransport(transport);
-         assertMisses(false);
       }
-      assertMisses(false);
-
-      remoteCache.get("noSuchKey");
-      assertMisses(true);
    }
 
-   private void assertOnlyServerHit(InetSocketAddress serverAddress) {
-      CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
-      CacheMgmtInterceptor interceptor = getCacheMgmtInterceptor(cacheContainer.getCache(CACHE_NAME));
-      assert interceptor.getHits() == 1;
-      for (CacheContainer cm : hrServ2CacheManager.values()) {
-         if (cm != cacheContainer) {
-            interceptor = getCacheMgmtInterceptor(cm.getCache(CACHE_NAME));
-            assert interceptor.getHits() == 0;
-         }
-      }
-   }
-
-   private void resetStats() {
-      for (int i = 0; i< 3; i++) {
-         CacheMgmtInterceptor cmi = getCacheMgmtInterceptor(manager(i).getCache(CACHE_NAME));
-         cmi.resetStatistics();
-      }
-   }
-   
    private void assertCacheContainsKey(InetSocketAddress serverAddress, byte[] keyBytes) {
       CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
-      Cache<Object, Object> cache = cacheContainer.getCache(CACHE_NAME);
+      Cache<Object, Object> cache = cacheContainer.getCache();
       DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
       assert dataContainer.keySet().contains(new ByteArrayKey(keyBytes));
    }
@@ -210,39 +164,6 @@
       return byteArrayOutputStream.toByteArray();
    }
 
-   private void addCacheMgmtInterceptor(Cache<Object, Object> cache) {
-      CacheMgmtInterceptor interceptor = new CacheMgmtInterceptor();
-      cache.getAdvancedCache().addInterceptor(interceptor, 1);
-   }
-
-   private void assertMisses(boolean expected) {
-      int misses = getMissCount(manager(0).getCache(CACHE_NAME));
-      misses += getMissCount(manager(1).getCache(CACHE_NAME));
-      misses += getMissCount(manager(2).getCache(CACHE_NAME));
-
-      if (expected) {
-         assert misses > 0;
-      } else {
-         assertEquals(0, misses);
-      }
-   }
-
-   private int getMissCount(Cache<Object, Object> cache) {
-      CacheMgmtInterceptor cacheMgmtInterceptor = getCacheMgmtInterceptor(cache);
-      return (int) cacheMgmtInterceptor.getMisses();
-   }
-
-   private CacheMgmtInterceptor getCacheMgmtInterceptor(Cache<Object, Object> cache) {
-      CacheMgmtInterceptor cacheMgmtInterceptor = null;
-      List<CommandInterceptor> interceptorChain = cache.getAdvancedCache().getInterceptorChain();
-      for (CommandInterceptor interceptor : interceptorChain) {
-         if (interceptor instanceof CacheMgmtInterceptor) {
-            cacheMgmtInterceptor = (CacheMgmtInterceptor) interceptor;
-         }
-      }
-      return cacheMgmtInterceptor;
-   }
-
    private byte[] generateKey(int i) {
       Random r = new Random();
       byte[] result = new byte[i];

Deleted: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -1,15 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.config.Configuration;
-import org.testng.annotations.Test;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test(groups = "functional" , testName = "client.hotrod.DistTopologyChangeTest")
-public class DistTopologyChange extends ReplTopologyChangeTest {
-   protected Configuration.CacheMode getCacheMode() {
-      return Configuration.CacheMode.DIST_SYNC;
-   }
-}

Copied: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java (from rev 1901, branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChange.java)
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/DistTopologyChangeTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,30 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(groups = "functional" , testName = "client.hotrod.DistTopologyChangeTest")
+public class DistTopologyChangeTest extends ReplTopologyChangeTest {
+   protected Configuration.CacheMode getCacheMode() {
+      return Configuration.CacheMode.DIST_SYNC;
+   }
+
+   @Override
+   protected void waitForClusterToForm(int memberCount) {
+      super.waitForClusterToForm(memberCount);
+      List<Cache> caches = new ArrayList<Cache>();
+      for (int i = 0; i < memberCount; i++) {
+         caches.add(manager(i).getCache());
+      }
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(caches);
+   }
+}

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,136 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.CacheMgmtInterceptor;
+import org.infinispan.interceptors.base.CommandInterceptor;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.BeforeMethod;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class HitsAwareCacheManagersTest extends MultipleCacheManagersTest {
+
+   Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
+   Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
+
+   @BeforeMethod
+   public void createBeforeMethod() throws Throwable {
+      if (cleanup == CleanupPhase.AFTER_METHOD) {
+         hrServ2CacheManager.clear();
+         addr2hrServer.clear();
+      }
+      super.createBeforeMethod();
+   }
+
+   protected HitCountInterceptor getHitCountInterceptor(Cache<Object, Object> cache) {
+      HitCountInterceptor hitCountInterceptor = null;
+      List<CommandInterceptor> interceptorChain = cache.getAdvancedCache().getInterceptorChain();
+      for (CommandInterceptor interceptor : interceptorChain) {
+         boolean isHitCountInterceptor = interceptor instanceof HitCountInterceptor;
+         if (hitCountInterceptor != null && isHitCountInterceptor) {
+            throw new IllegalStateException("Two HitCountInterceptors! " + interceptorChain);
+         }
+         if (isHitCountInterceptor) {
+            hitCountInterceptor = (HitCountInterceptor) interceptor;
+         }
+      }
+      return hitCountInterceptor;
+   }
+
+   protected void assertOnlyServerHit(InetSocketAddress serverAddress) {
+      CacheContainer cacheContainer = hrServ2CacheManager.get(serverAddress);
+      HitCountInterceptor interceptor = getHitCountInterceptor(cacheContainer.getCache());
+      assert interceptor.getHits() == 1 : "Expected one hit but received " + interceptor.getHits();
+      for (CacheContainer cm : hrServ2CacheManager.values()) {
+         if (cm != cacheContainer) {
+            interceptor = getHitCountInterceptor(cm.getCache());
+            assert interceptor.getHits() == 0 : "Expected 0 hits but got " + interceptor.getHits();
+         }
+      }
+   }
+
+   protected void assertNoHits() {
+      for (CacheContainer cm : hrServ2CacheManager.values()) {
+         HitCountInterceptor interceptor = getHitCountInterceptor(cm.getCache());
+         assert interceptor.getHits() == 0 : "Expected 0 hits but got " + interceptor.getHits();
+      }
+   }
+
+   protected InetSocketAddress getAddress(HotRodServer hotRodServer) {
+      InetSocketAddress socketAddress = new InetSocketAddress(hotRodServer.getHost(), hotRodServer.getPort());
+      addr2hrServer.put(socketAddress, hotRodServer);
+      return socketAddress;
+   }
+
+   protected void resetStats() {
+      for (EmbeddedCacheManager manager : cacheManagers) {
+         HitCountInterceptor cmi = getHitCountInterceptor(manager.getCache());
+         cmi.reset();
+      }
+   }
+
+   protected void addInterceptors() {
+      for (EmbeddedCacheManager manager : cacheManagers) {
+         addHitCountInterceptor(manager.getCache());
+      }
+   }
+
+   private void addHitCountInterceptor(Cache<Object, Object> cache) {
+      InetSocketAddress addr;
+      addr = getHotRodServerAddress(cache);
+      HitCountInterceptor interceptor = new HitCountInterceptor(addr);
+      cache.getAdvancedCache().addInterceptor(interceptor, 1);
+   }
+
+   private InetSocketAddress getHotRodServerAddress(Cache<Object, Object> cache) {
+      InetSocketAddress addr = null;
+      for (Map.Entry<InetSocketAddress, CacheContainer> entry : hrServ2CacheManager.entrySet()) {
+         if (entry.getValue().equals(cache.getCacheManager())) {
+            addr = entry.getKey();
+         }
+      }
+      return addr;
+   }
+
+   /**
+    * @author Mircea.Markus at jboss.com
+    * @since 4.1
+    */
+   public static class HitCountInterceptor extends CommandInterceptor{
+
+      private volatile int invocationCount;
+      private volatile InetSocketAddress addr;
+
+      public HitCountInterceptor(InetSocketAddress addr) {
+         this.addr = addr;
+      }
+
+      @Override
+      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
+         if (ctx.isOriginLocal()) {
+            invocationCount ++;
+         }
+         return super.handleDefault(ctx, command);
+      }
+
+      public int getHits() {
+         return invocationCount;
+      }
+
+      public void reset() {
+         invocationCount = 0;
+      }
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplTopologyChangeTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -54,13 +54,11 @@
       manager(0).getCache();
       manager(1).getCache();
 
-      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 2, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+      waitForClusterToForm(2);
 
-      manager(0).getCache().put("k", "v");
-      manager(0).getCache().get("k").equals("v");
-      manager(1).getCache().get("k").equals("v");
+      manager(0).getCache().put("k_test", "v");
+      manager(0).getCache().get("k_test").equals("v");
+      manager(1).getCache().get("k_test").equals("v");
 
       log.info("Local replication test passed!");
 
@@ -89,10 +87,7 @@
       hotRodServer3 = TestHelper.startHotRodServer(manager(2));
       manager(2).getCache();
 
-      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1), manager(2));
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+      waitForClusterToForm(3);
 
       try {
          expectTopologyChange(new InetSocketAddress("localhost", hotRodServer3.getPort()), true);
@@ -108,11 +103,11 @@
    public void testDropServer() {
       hotRodServer3.stop();
       manager(2).stop();
-      TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1));
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
 
-      InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());
+      waitForClusterToForm(2);
+
+      InetSocketAddress server3Address = new InetSocketAddress("localhost", hotRodServer3.getPort());      
+
       try {
          expectTopologyChange(server3Address, false);
          assertEquals(2, tcpConnectionFactory.getServers().size());
@@ -125,15 +120,16 @@
 
    private void expectTopologyChange(InetSocketAddress server1Address, boolean added) {
       for (int i = 0; i < 10; i++) {
-         try {
-            remoteCache.put("k" + i, "v" + i);
-         } catch (Exception e) {
-            if (added) {
-               throw new IllegalStateException(e);
-            } //else it is acceptable, as the transport hasn't changed
-         }
+         remoteCache.put("k" + i, "v" + i);         
          if (added == tcpConnectionFactory.getServers().contains(server1Address)) break;
       }
       assertEquals(server1Address + " not found", added, tcpConnectionFactory.getServers().contains(server1Address));
    }
+   
+   protected void waitForClusterToForm(int memberCount) {
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), memberCount, 10000);
+      for (int i = 0; i < memberCount; i++) {
+         TestingUtil.blockUntilCacheStatusAchieved(manager(i).getCache(), ComponentStatus.RUNNING, 10000);
+      }
+   }
 }

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -0,0 +1,189 @@
+package org.infinispan.client.hotrod;
+
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
+public class ReplicationRetryTest extends HitsAwareCacheManagersTest {
+   HotRodServer hotRodServer1;
+   HotRodServer hotRodServer2;
+   HotRodServer hotRodServer3;
+
+   RemoteCache remoteCache;
+   private RemoteCacheManager remoteCacheManager;
+   private TcpTransportFactory tcpConnectionFactory;
+   private Configuration config;
+   private RoundRobinBalancingStrategy strategy;
+
+   public ReplicationRetryTest() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected void assertSupportedConfig() {
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+
+      assert cleanup == CleanupPhase.AFTER_METHOD;
+
+
+      config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+      CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm1);
+      registerCacheManager(cm2);
+      registerCacheManager(cm3);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+      hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
+      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+      hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
+
+      manager(0).getCache();
+      manager(1).getCache();
+      manager(2).getCache();
+
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+
+      Properties clientConfig = new Properties();
+      clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
+      clientConfig.put("force-return-value", "true");
+      clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
+
+      remoteCacheManager = new RemoteCacheManager(clientConfig);
+      remoteCache = remoteCacheManager.getCache();
+      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+      strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
+      addInterceptors();
+
+      assert super.cacheManagers.size() == 3;
+
+   }
+
+
+   public void testGet() {
+      validateSequenceAndStopServer();
+      //now make sure that next call won't fail
+      resetStats();
+      for (int i = 0; i < 100; i++) {
+         assert remoteCache.get("k").equals("v");
+      }
+   }
+
+   public void testPut() {
+
+      validateSequenceAndStopServer();
+      resetStats();
+
+      assert "v".equals(remoteCache.put("k", "v0"));
+      for (int i = 1; i < 100; i++) {
+         assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
+      }
+   }
+
+   public void testRemove() {
+      validateSequenceAndStopServer();
+      resetStats();
+
+      assertEquals("v", remoteCache.remove("k"));
+   }
+
+   public void testContains() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(true, remoteCache.containsKey("k"));
+   }
+
+   public void testGetWithVersion() {
+      validateSequenceAndStopServer();
+      resetStats();
+      VersionedValue value = remoteCache.getVersioned("k");
+      assertEquals("v", value.getValue());
+   }
+
+   public void testPutIfAbsent() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+      assertEquals("someValue", remoteCache.get("noSuchKey"));
+   }
+
+   public void testReplace() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals("v", remoteCache.replace("k", "v2"));
+   }
+
+   public void testReplaceIfUnmodified() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
+   }
+
+   public void testRemoveIfUnmodified() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(false, remoteCache.removeWithVersion("k", 12));
+   }
+
+   public void testClear() {
+      validateSequenceAndStopServer();
+      resetStats();
+      remoteCache.clear();
+      assertEquals(false, remoteCache.containsKey("k"));
+   }
+
+   private void validateSequenceAndStopServer() {
+      resetStats();
+      assertNoHits();
+      InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      assertNoHits();
+      remoteCache.put("k","v");
+
+      assert strategy.getServers().length == 3;
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k2","v2");
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k3","v3");
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k","v");
+      assertOnlyServerHit(expectedServer);
+
+      //this would be the next server to be shutdown
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      HotRodServer toStop = addr2hrServer.get(expectedServer);
+      toStop.stop();
+   }
+}

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java	2010-06-16 16:48:27 UTC (rev 1919)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/RoundRobinBalancingIntegrationTest.java	2010-06-17 01:46:01 UTC (rev 1920)
@@ -169,9 +169,8 @@
          remoteCache.put("k6", "v2");
          remoteCache.put("k7", "v3");
          remoteCache.put("k8", "v4");
-         assert false : "exception expected as balancer is still redirecting to failed node";
-      } catch (TransportException e) {
-         //expected
+      } catch (Exception e) {
+         assert false : "exception should not happen even if the balancer redirects to failed node at the beggining";
       }
    }
 



More information about the infinispan-commits mailing list