[infinispan-commits] Infinispan SVN: r1944 - in branches/4.1.x: cachestore/remote/src/test/java/org/infinispan/loaders/remote and 17 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jul 1 03:31:38 EDT 2010


Author: mircea.markus
Date: 2010-07-01 03:31:37 -0400 (Thu, 01 Jul 2010)
New Revision: 1944

Added:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
Removed:
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
Modified:
   branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java
   branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
   branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
   branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
   branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java
   branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
   branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java
   branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java
   branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
   branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
   branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
   branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java
   branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
   branches/4.1.x/core/src/test/resources/log4j.xml
Log:
implemented retry logic in hotrod client

Modified: branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java
===================================================================
--- branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/cachestore/remote/src/main/java/org/infinispan/loaders/remote/RemoteCacheStoreConfig.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -2,7 +2,7 @@
 
 import org.infinispan.CacheException;
 import org.infinispan.loaders.AbstractCacheStoreConfig;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
 import org.infinispan.util.FileLookup;
 
 import java.io.IOException;
@@ -43,12 +43,12 @@
 
    public void setUseDefaultRemoteCache(boolean useDefaultRemoteCache) {
       if (useDefaultRemoteCache) {
-         setRemoteCacheName(DefaultCacheManager.DEFAULT_CACHE_NAME);
+         setRemoteCacheName(CacheContainer.DEFAULT_CACHE_NAME);
       }
    }
 
    public boolean isUseDefaultRemoteCache() {
-      return DefaultCacheManager.DEFAULT_CACHE_NAME.equals(getRemoteCacheName());
+      return CacheContainer.DEFAULT_CACHE_NAME.equals(getRemoteCacheName());
    }
 
    @Override

Modified: branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java
===================================================================
--- branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/cachestore/remote/src/test/java/org/infinispan/loaders/remote/RemoteCacheStoreFunctionalTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -3,7 +3,7 @@
 import org.infinispan.client.hotrod.TestHelper;
 import org.infinispan.loaders.BaseCacheStoreFunctionalTest;
 import org.infinispan.loaders.CacheStoreConfig;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.server.hotrod.HotRodServer;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
@@ -27,7 +27,7 @@
       localCacheManager = TestCacheManagerFactory.createLocalCacheManager();
       hrServer = TestHelper.startHotRodServer(localCacheManager);
 
-      remoteCacheStoreConfig.setRemoteCacheName(DefaultCacheManager.DEFAULT_CACHE_NAME);
+      remoteCacheStoreConfig.setRemoteCacheName(CacheContainer.DEFAULT_CACHE_NAME);
       Properties properties = new Properties();
       properties.put("hotrod-servers", "localhost:"+ hrServer.getPort());
       remoteCacheStoreConfig.setHotRodClientProperties(properties);

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/RemoteCacheManager.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -285,6 +285,7 @@
    }
 
    public <K, V> RemoteCache<K, V> getCache(boolean forceReturnValue) {
+      //As per the HotRod protocol specification, the default cache is identified by an empty string
       return createRemoteCache("", forceReturnValue);
    }
 

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/RemoteCacheImpl.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -39,6 +39,9 @@
    private volatile boolean forceReturnValue;
 
    public RemoteCacheImpl(RemoteCacheManager rcm, String name, boolean forceReturnValue) {
+      if (log.isTraceEnabled()) {
+         log.trace("Creating remote cache: " + name);
+      }
       this.name = name;
       this.forceReturnValue = forceReturnValue;
       this.remoteCacheManager = rcm;

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/SerializationMarshaller.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -14,6 +14,7 @@
 
 /**
  * Default marshaller implementation based on object serialization.
+ * todo - the marshaller should only be when writing to the actual transport
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.1

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsHelper.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -20,7 +20,7 @@
  * @since 4.1
  */
 public class HotRodOperationsHelper {
-   static Log log = LogFactory.getLog(HotRodOperationsImpl.class);
+   static Log log = LogFactory.getLog(HotRodOperationsHelper.class);
    static final AtomicLong MSG_ID = new AtomicLong();
    final static byte CLIENT_INTELLIGENCE = HotRodConstants.CLIENT_INTELLIGENCE_HASH_DISTRIBUTION_AWARE;
 

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/protocol/HotRodOperationsImpl.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -36,7 +36,7 @@
    public byte[] get(byte[] key, Flag[] flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
          try {
-            Transport transport = transportFactory.getTransport(key);
+            Transport transport = getTransport(key, i == 0);
             try {
                short status = sendKeyOperation(key, transport, GET_REQUEST, flags, GET_RESPONSE);
                if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -57,7 +57,7 @@
 
    public byte[] remove(byte[] key, Flag[] flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendKeyOperation(key, transport, REMOVE_REQUEST, flags, REMOVE_RESPONSE);
             if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -67,7 +67,7 @@
             }
          } catch (TransportException te) {
             logErrorAndThrowExceptionIfNeeded(i, te);
-         }finally {
+         } finally {
             releaseTransport(transport);
          }
       }
@@ -76,7 +76,7 @@
 
    public boolean containsKey(byte[] key, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendKeyOperation(key, transport, CONTAINS_KEY_REQUEST, flags, CONTAINS_KEY_RESPONSE);
             if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -96,7 +96,7 @@
 
    public BinaryVersionedValue getWithVersion(byte[] key, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendKeyOperation(key, transport, GET_WITH_VERSION, flags, GET_WITH_VERSION_RESPONSE);
             if (status == KEY_DOES_NOT_EXIST_STATUS) {
@@ -111,7 +111,7 @@
                return new BinaryVersionedValue(version, value);
             }
          } catch (TransportException te) {
-           logErrorAndThrowExceptionIfNeeded(i, te); 
+            logErrorAndThrowExceptionIfNeeded(i, te);
          } finally {
             releaseTransport(transport);
          }
@@ -122,7 +122,7 @@
 
    public byte[] put(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendPutOperation(key, value, transport, PUT_REQUEST, PUT_RESPONSE, lifespan, maxIdle, flags);
             if (status != NO_ERROR_STATUS) {
@@ -131,7 +131,7 @@
             return returnPossiblePrevValue(transport, flags);
          } catch (TransportException te) {
             logErrorAndThrowExceptionIfNeeded(i, te);
-         }finally {
+         } finally {
             releaseTransport(transport);
          }
       }
@@ -140,7 +140,7 @@
 
    public byte[] putIfAbsent(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendPutOperation(key, value, transport, PUT_IF_ABSENT_REQUEST, PUT_IF_ABSENT_RESPONSE, lifespan, maxIdle, flags);
             if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -162,7 +162,7 @@
 
    public byte[] replace(byte[] key, byte[] value, int lifespan, int maxIdle, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             short status = sendPutOperation(key, value, transport, REPLACE_REQUEST, REPLACE_RESPONSE, lifespan, maxIdle, flags);
             if (status == NO_ERROR_STATUS || status == NOT_PUT_REMOVED_REPLACED_STATUS) {
@@ -185,7 +185,7 @@
     */
    public VersionedOperationResponse replaceIfUnmodified(byte[] key, byte[] value, int lifespan, int maxIdle, long version, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             // 1) write header
             long messageId = HotRodOperationsHelper.writeHeader(transport, REPLACE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
@@ -212,7 +212,7 @@
     */
    public VersionedOperationResponse removeIfUnmodified(byte[] key, long version, Flag... flags) {
       for (int i = 0; i < transportFactory.getTransportCount(); i++) {
-         Transport transport = transportFactory.getTransport(key);
+         Transport transport = getTransport(key, i == 0);
          try {
             // 1) write header
             long messageId = HotRodOperationsHelper.writeHeader(transport, REMOVE_IF_UNMODIFIED_REQUEST, cacheName, topologyId, flags);
@@ -354,4 +354,12 @@
          log.trace(message + ":" + te);
       }
    }
+
+   private Transport getTransport(byte[] key, boolean hashAware) {
+      if (hashAware) {
+         return transportFactory.getTransport(key);
+      } else {
+         return transportFactory.getTransport();
+      }
+   }
 }

Modified: branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/transport/AbstractTransport.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -85,7 +85,7 @@
       if (!string.isEmpty()) {
          writeArray(string.getBytes(CHARSET));
       } else {
-         writeVInt(0);         
+         writeVInt(0);
       }
    }
 

Modified: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/HitsAwareCacheManagersTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -22,8 +22,8 @@
  */
 public abstract class HitsAwareCacheManagersTest extends MultipleCacheManagersTest {
 
-   Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
-   Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
+   protected Map<InetSocketAddress, CacheContainer> hrServ2CacheManager = new HashMap<InetSocketAddress, CacheContainer>();
+   protected Map<InetSocketAddress, HotRodServer> addr2hrServer = new HashMap<InetSocketAddress, HotRodServer>();
 
    @BeforeMethod
    public void createBeforeMethod() throws Throwable {

Deleted: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/ReplicationRetryTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -1,189 +0,0 @@
-package org.infinispan.client.hotrod;
-
-import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
-import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
-import org.infinispan.config.Configuration;
-import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.CacheContainer;
-import org.infinispan.server.hotrod.HotRodServer;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.testng.annotations.Test;
-
-import java.net.InetSocketAddress;
-import java.util.Properties;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.1
- */
- at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
-public class ReplicationRetryTest extends HitsAwareCacheManagersTest {
-   HotRodServer hotRodServer1;
-   HotRodServer hotRodServer2;
-   HotRodServer hotRodServer3;
-
-   RemoteCache remoteCache;
-   private RemoteCacheManager remoteCacheManager;
-   private TcpTransportFactory tcpConnectionFactory;
-   private Configuration config;
-   private RoundRobinBalancingStrategy strategy;
-
-   public ReplicationRetryTest() {
-      cleanup = CleanupPhase.AFTER_METHOD;
-   }
-
-   @Override
-   protected void assertSupportedConfig() {
-   }
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-
-      assert cleanup == CleanupPhase.AFTER_METHOD;
-
-
-      config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
-      CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
-      registerCacheManager(cm1);
-      registerCacheManager(cm2);
-      registerCacheManager(cm3);
-
-      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
-      hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
-      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
-      hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
-      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
-      hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
-
-      manager(0).getCache();
-      manager(1).getCache();
-      manager(2).getCache();
-
-      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
-      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
-
-      Properties clientConfig = new Properties();
-      clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
-      clientConfig.put("force-return-value", "true");
-      clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
-
-      remoteCacheManager = new RemoteCacheManager(clientConfig);
-      remoteCache = remoteCacheManager.getCache();
-      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
-      strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
-      addInterceptors();
-
-      assert super.cacheManagers.size() == 3;
-
-   }
-
-
-   public void testGet() {
-      validateSequenceAndStopServer();
-      //now make sure that next call won't fail
-      resetStats();
-      for (int i = 0; i < 100; i++) {
-         assert remoteCache.get("k").equals("v");
-      }
-   }
-
-   public void testPut() {
-
-      validateSequenceAndStopServer();
-      resetStats();
-
-      assert "v".equals(remoteCache.put("k", "v0"));
-      for (int i = 1; i < 100; i++) {
-         assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
-      }
-   }
-
-   public void testRemove() {
-      validateSequenceAndStopServer();
-      resetStats();
-
-      assertEquals("v", remoteCache.remove("k"));
-   }
-
-   public void testContains() {
-      validateSequenceAndStopServer();
-      resetStats();
-      assertEquals(true, remoteCache.containsKey("k"));
-   }
-
-   public void testGetWithVersion() {
-      validateSequenceAndStopServer();
-      resetStats();
-      VersionedValue value = remoteCache.getVersioned("k");
-      assertEquals("v", value.getValue());
-   }
-
-   public void testPutIfAbsent() {
-      validateSequenceAndStopServer();
-      resetStats();
-      assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
-      assertEquals("someValue", remoteCache.get("noSuchKey"));
-   }
-
-   public void testReplace() {
-      validateSequenceAndStopServer();
-      resetStats();
-      assertEquals("v", remoteCache.replace("k", "v2"));
-   }
-
-   public void testReplaceIfUnmodified() {
-      validateSequenceAndStopServer();
-      resetStats();
-      assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
-   }
-
-   public void testRemoveIfUnmodified() {
-      validateSequenceAndStopServer();
-      resetStats();
-      assertEquals(false, remoteCache.removeWithVersion("k", 12));
-   }
-
-   public void testClear() {
-      validateSequenceAndStopServer();
-      resetStats();
-      remoteCache.clear();
-      assertEquals(false, remoteCache.containsKey("k"));
-   }
-
-   private void validateSequenceAndStopServer() {
-      resetStats();
-      assertNoHits();
-      InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
-      assertNoHits();
-      remoteCache.put("k","v");
-
-      assert strategy.getServers().length == 3;
-      assertOnlyServerHit(expectedServer);
-
-      resetStats();
-      expectedServer = strategy.getServers()[strategy.getNextPosition()];
-      remoteCache.put("k2","v2");
-      assertOnlyServerHit(expectedServer);
-
-      resetStats();
-      expectedServer = strategy.getServers()[strategy.getNextPosition()];
-      remoteCache.put("k3","v3");
-      assertOnlyServerHit(expectedServer);
-
-      resetStats();
-      expectedServer = strategy.getServers()[strategy.getNextPosition()];
-      remoteCache.put("k","v");
-      assertOnlyServerHit(expectedServer);
-
-      //this would be the next server to be shutdown
-      expectedServer = strategy.getServers()[strategy.getNextPosition()];
-      HotRodServer toStop = addr2hrServer.get(expectedServer);
-      toStop.stop();
-   }
-}

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/AbstractRetryTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,93 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.client.hotrod.HitsAwareCacheManagersTest;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.TestHelper;
+import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+
+import java.util.Properties;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+public abstract class AbstractRetryTest extends HitsAwareCacheManagersTest {
+   
+   protected HotRodServer hotRodServer1;
+   protected HotRodServer hotRodServer2;
+   protected HotRodServer hotRodServer3;
+
+   RemoteCacheImpl remoteCache;
+   protected RemoteCacheManager remoteCacheManager;
+   protected TcpTransportFactory tcpConnectionFactory;
+   protected Configuration config;
+   protected RoundRobinBalancingStrategy strategy;
+
+   public AbstractRetryTest() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected void assertSupportedConfig() {
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+
+      assert cleanup == CleanupPhase.AFTER_METHOD;
+
+
+      config = getCacheConfig();
+      CacheContainer cm1 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheContainer cm2 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      CacheContainer cm3 = TestCacheManagerFactory.createClusteredCacheManager(config);
+      registerCacheManager(cm1);
+      registerCacheManager(cm2);
+      registerCacheManager(cm3);
+
+      hotRodServer1 = TestHelper.startHotRodServer(manager(0));
+      hrServ2CacheManager.put(getAddress(hotRodServer1), cm1);
+      hotRodServer2 = TestHelper.startHotRodServer(manager(1));
+      hrServ2CacheManager.put(getAddress(hotRodServer2), cm2);
+      hotRodServer3 = TestHelper.startHotRodServer(manager(2));
+      hrServ2CacheManager.put(getAddress(hotRodServer3), cm3);
+
+      manager(0).getCache();
+      manager(1).getCache();
+      manager(2).getCache();
+
+      waitForClusterToForm();
+
+      Properties clientConfig = new Properties();
+      clientConfig.put("hotrod-servers", "localhost:" + hotRodServer2.getPort());
+      clientConfig.put("force-return-value", "true");
+      clientConfig.put("maxActive",1); //this ensures that only one server is active at a time
+
+      remoteCacheManager = new RemoteCacheManager(clientConfig);
+      remoteCache = (RemoteCacheImpl) remoteCacheManager.getCache();
+      tcpConnectionFactory = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+      strategy = (RoundRobinBalancingStrategy) tcpConnectionFactory.getBalancer();
+      addInterceptors();
+
+      assert super.cacheManagers.size() == 3;
+
+   }
+
+   protected abstract Configuration getCacheConfig();
+
+   protected void waitForClusterToForm() {
+      TestingUtil.blockUntilViewReceived(manager(0).getCache(), 3, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(0).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(1).getCache(), ComponentStatus.RUNNING, 10000);
+      TestingUtil.blockUntilCacheStatusAchieved(manager(2).getCache(), ComponentStatus.RUNNING, 10000);
+   }
+}

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/DistributionRetryTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,148 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.Cache;
+import org.infinispan.affinity.KeyAffinityService;
+import org.infinispan.affinity.KeyAffinityServiceFactory;
+import org.infinispan.affinity.KeyGenerator;
+import org.infinispan.client.hotrod.VersionedValue;
+import org.infinispan.client.hotrod.impl.SerializationMarshaller;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransport;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Random;
+import java.util.concurrent.Executors;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test(testName = "hotrod.retry.DistributionRetryTest", groups = "functional")
+public class DistributionRetryTest extends AbstractRetryTest {
+
+   @Override
+   protected Configuration getCacheConfig() {
+      Configuration config = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      config.setNumOwners(1);
+      return config;
+   }
+
+   @Override
+   protected void waitForClusterToForm() {
+      super.waitForClusterToForm();
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+   }
+
+   public void testGet() {
+      log.info("Starting actual test");
+      Object key = generateKeyAndShutdownServer();
+      //now make sure that next call won't fail
+      resetStats();
+      assertEquals(remoteCache.get(key), "v");
+   }
+
+   public void testPut() {
+      Object key = generateKeyAndShutdownServer();
+      log.info("Here it starts");
+      assertEquals(remoteCache.put(key, "v0"), "v");
+   }
+
+   public void testRemove() {
+      Object key = generateKeyAndShutdownServer();
+      assertEquals("v", remoteCache.remove(key));
+   }
+
+   public void testContains() {
+      Object key = generateKeyAndShutdownServer();
+      resetStats();
+      assertEquals(true, remoteCache.containsKey(key));
+   }
+
+   public void testGetWithVersion() {
+      Object key = generateKeyAndShutdownServer();
+      resetStats();
+      VersionedValue value = remoteCache.getVersioned(key);
+      assertEquals("v", value.getValue());
+   }
+
+   public void testPutIfAbsent() {
+      Object key = generateKeyAndShutdownServer();
+      assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+      assertEquals("someValue", remoteCache.get("noSuchKey"));
+   }
+
+   public void testReplace() {
+      Object key = generateKeyAndShutdownServer();
+      assertEquals("v", remoteCache.replace(key, "v2"));
+   }
+
+   public void testReplaceIfUnmodified() {
+      Object key = generateKeyAndShutdownServer();
+      assertEquals(false, remoteCache.replaceWithVersion(key, "v2", 12));
+   }
+
+   public void testRemoveIfUnmodified() {
+      Object key = generateKeyAndShutdownServer();
+      resetStats();
+      assertEquals(false, remoteCache.removeWithVersion(key, 12));
+   }
+
+   public void testClear() {
+      Object key = generateKeyAndShutdownServer();
+      resetStats();
+      remoteCache.clear();
+      assertEquals(false, remoteCache.containsKey(key));
+   }
+
+   private Object generateKeyAndShutdownServer() {
+      resetStats();
+      Cache<Object,Object> cache = manager(1).getCache();
+      KeyAffinityService kaf = KeyAffinityServiceFactory.newKeyAffinityService(cache, Executors.newSingleThreadExecutor(), new ByteKeyGenerator(), 2, true);
+      Address address = cache.getAdvancedCache().getRpcManager().getTransport().getAddress();
+      byte[] keyBytes = (byte[]) kaf.getKeyForAddress(address);
+      String key = ByteKeyGenerator.getStringObject(keyBytes);
+      kaf.stop();
+
+      remoteCache.put(key, "v");
+      assertOnlyServerHit(getAddress(hotRodServer2));
+      TcpTransportFactory tcpTp = (TcpTransportFactory) TestingUtil.extractField(remoteCacheManager, "transportFactory");
+
+      SerializationMarshaller sm = new SerializationMarshaller();
+      TcpTransport transport = (TcpTransport) tcpTp.getTransport(sm.marshallObject(key));
+      try {
+      assertEquals(transport.getServerAddress(), new InetSocketAddress("localhost", hotRodServer2.getPort()));
+      } finally {
+         tcpTp.releaseTransport(transport);
+      }
+      
+
+      log.info("About to stop hotrod server 2");
+      hotRodServer2.stop();
+
+
+      return key;
+   }
+
+   static class ByteKeyGenerator implements KeyGenerator {
+      Random r = new Random();
+      @Override
+      public Object getKey() {
+         String result = String.valueOf(r.nextLong());
+         SerializationMarshaller sm = new SerializationMarshaller();
+         return sm.marshallObject(result);
+      }
+
+      static String getStringObject(byte[] bytes) {
+         SerializationMarshaller sm = new SerializationMarshaller();
+         return (String) sm.readObject(bytes);
+      }
+   }
+
+}

Added: branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java
===================================================================
--- branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java	                        (rev 0)
+++ branches/4.1.x/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/retry/ReplicationRetryTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -0,0 +1,137 @@
+package org.infinispan.client.hotrod.retry;
+
+import org.infinispan.client.hotrod.HitsAwareCacheManagersTest;
+import org.infinispan.client.hotrod.RemoteCache;
+import org.infinispan.client.hotrod.RemoteCacheManager;
+import org.infinispan.client.hotrod.TestHelper;
+import org.infinispan.client.hotrod.VersionedValue;
+import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
+import org.infinispan.client.hotrod.impl.transport.tcp.TcpTransportFactory;
+import org.infinispan.config.Configuration;
+import org.infinispan.lifecycle.ComponentStatus;
+import org.infinispan.manager.CacheContainer;
+import org.infinispan.server.hotrod.HotRodServer;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.util.Properties;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.1
+ */
+ at Test (testName = "client.hotrod.ReplicationRetryTest", groups = "functional")
+public class ReplicationRetryTest extends AbstractRetryTest {
+
+   public void testGet() {
+      validateSequenceAndStopServer();
+      //now make sure that next call won't fail
+      resetStats();
+      for (int i = 0; i < 100; i++) {
+         assert remoteCache.get("k").equals("v");
+      }
+   }
+
+   public void testPut() {
+
+      validateSequenceAndStopServer();
+      resetStats();
+
+      assert "v".equals(remoteCache.put("k", "v0"));
+      for (int i = 1; i < 100; i++) {
+         assertEquals("v" + (i-1), remoteCache.put("k", "v"+i));
+      }
+   }
+
+   public void testRemove() {
+      validateSequenceAndStopServer();
+      resetStats();
+
+      assertEquals("v", remoteCache.remove("k"));
+   }
+
+   public void testContains() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(true, remoteCache.containsKey("k"));
+   }
+
+   public void testGetWithVersion() {
+      validateSequenceAndStopServer();
+      resetStats();
+      VersionedValue value = remoteCache.getVersioned("k");
+      assertEquals("v", value.getValue());
+   }
+
+   public void testPutIfAbsent() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(null, remoteCache.putIfAbsent("noSuchKey", "someValue"));
+      assertEquals("someValue", remoteCache.get("noSuchKey"));
+   }
+
+   public void testReplace() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals("v", remoteCache.replace("k", "v2"));
+   }
+
+   public void testReplaceIfUnmodified() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(false, remoteCache.replaceWithVersion("k", "v2", 12));
+   }
+
+   public void testRemoveIfUnmodified() {
+      validateSequenceAndStopServer();
+      resetStats();
+      assertEquals(false, remoteCache.removeWithVersion("k", 12));
+   }
+
+   public void testClear() {
+      validateSequenceAndStopServer();
+      resetStats();
+      remoteCache.clear();
+      assertEquals(false, remoteCache.containsKey("k"));
+   }
+
+   private void validateSequenceAndStopServer() {
+      resetStats();
+      assertNoHits();
+      InetSocketAddress expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      assertNoHits();
+      remoteCache.put("k","v");
+
+      assert strategy.getServers().length == 3;
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k2","v2");
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k3","v3");
+      assertOnlyServerHit(expectedServer);
+
+      resetStats();
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      remoteCache.put("k","v");
+      assertOnlyServerHit(expectedServer);
+
+      //this would be the next server to be shutdown
+      expectedServer = strategy.getServers()[strategy.getNextPosition()];
+      HotRodServer toStop = addr2hrServer.get(expectedServer);
+      toStop.stop();
+   }
+
+   @Override
+   protected Configuration getCacheConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC);
+   }
+}

Modified: branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/CacheDelegate.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -53,7 +53,7 @@
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
 import org.infinispan.lifecycle.ComponentStatus;
-import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.marshall.MarshalledValue;
 import org.infinispan.marshall.Marshaller;
@@ -368,7 +368,7 @@
    @ManagedAttribute(description = "Returns the cache name")
    @Metric(displayName = "Cache name", dataType = DataType.TRAIT, displayType = DisplayType.SUMMARY)
    public String getCacheName() {
-      return getName().equals(DefaultCacheManager.DEFAULT_CACHE_NAME) ? "Default Cache" : getName();
+      return getName().equals(CacheContainer.DEFAULT_CACHE_NAME) ? "Default Cache" : getName();
    }
 
    public String getVersion() {

Modified: branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -107,7 +107,7 @@
    private InvocationContextContainer icc;
    @ManagedAttribute(description = "If true, the node has successfully joined the grid and is considered to hold state.  If false, the join process is still in progress.")
    @Metric(displayName = "Is join completed?", dataType = DataType.TRAIT)
-   volatile boolean joinComplete = false;
+   private volatile boolean joinComplete = false;
    Future<Void> joinFuture;
    final List<Address> leavers = new CopyOnWriteArrayList<Address>();
    volatile Future<Void> leaveTaskFuture;
@@ -131,12 +131,19 @@
 
    @Start(priority = 20)
    public void start() throws Exception {
+      if (log.isTraceEnabled()) {
+         log.trace("Starting distribution manager on " + getMyAddress());
+      }
       replCount = configuration.getNumOwners();
       listener = new ViewChangeListener();
       notifier.addListener(listener);
       join();
    }
 
+   private Address getMyAddress() {
+      return rpcManager != null? rpcManager.getAddress(): null;
+   }
+
    // To avoid blocking other components' start process, wait last, if necessary, for join to complete.
    @Start(priority = 1000)
    public void waitForJoinToComplete() throws Throwable {
@@ -162,7 +169,7 @@
          JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
          joinFuture = rehashExecutor.submit(joinTask);
       } else {
-         joinComplete = true;
+         setJoinComplete(true);
       }
       startLatch.open();
    }
@@ -171,7 +178,7 @@
    public void stop() {
       notifier.removeListener(listener);
       rehashExecutor.shutdownNow();
-      joinComplete = false;
+      setJoinComplete(false);
    }
 
    public void rehash(List<Address> newMembers, List<Address> oldMembers) {
@@ -368,7 +375,10 @@
    public class ViewChangeListener {
       @ViewChanged
       public void handleViewChange(ViewChangedEvent e) {
-         boolean started = false;
+         if (log.isTraceEnabled()) {
+            log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
+         }
+         boolean started;
          // how long do we wait for a startup?
          if (e.isNeedsToRejoin()) {
             try {
@@ -416,6 +426,13 @@
       return joinComplete;
    }
 
+   public void setJoinComplete(boolean joinComplete) {
+      if (log.isTraceEnabled()) {
+         log.trace("Setting joinComplete to " + joinComplete + " for node " + rpcManager.getAddress());
+      }
+      this.joinComplete = joinComplete;
+   }
+
    void drainLocalTransactionLog() {
       List<WriteCommand> c;
       while (transactionLogger.shouldDrainWithoutLock()) {

Modified: branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -72,13 +72,15 @@
    protected void performRehash() throws Exception {
       long start = System.currentTimeMillis();
       boolean trace = log.isTraceEnabled();
-      if (log.isDebugEnabled()) log.debug("Commencing");
+      if (log.isDebugEnabled()) log.debug("Commencing rehash on node: " + getMyAddress() + ". Before start, dmi.joinComplete = " + dmi.isJoinComplete());
       TransactionLogger transactionLogger = dmi.getTransactionLogger();
       boolean unlocked = false;
       ConsistentHash chOld;
       ConsistentHash chNew;
       try {
-         dmi.joinComplete = false;
+         if (dmi.isJoinComplete()) {
+            throw new IllegalStateException("Join cannot be complete without rehash to finish (node " + getMyAddress() + " )");
+         }
          // 1.  Get chOld from coord.         
          chOld = retrieveOldCH(trace);
 
@@ -142,7 +144,7 @@
          throw new CacheException("Unexpected exception", e);
       } finally {
          if (!unlocked) transactionLogger.unlockAndDisable();
-         dmi.joinComplete = true;
+         dmi.setJoinComplete(true);
       }
    }
 
@@ -230,4 +232,8 @@
       if (!l.contains(plusOne)) l.add(plusOne);
       return l;
    }
+
+   public Address getMyAddress() {
+      return rpcManager != null && rpcManager.getTransport() != null ? rpcManager.getTransport().getAddress() : null;
+   }
 }

Modified: branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/factories/InternalCacheFactory.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -27,6 +27,7 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.config.ConfigurationException;
 import org.infinispan.jmx.CacheJmxRegistration;
+import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.DefaultCacheManager;
 
 /**
@@ -68,7 +69,7 @@
    }
 
    public Cache<K, V> createDefaultCache(Configuration configuration) throws ConfigurationException {
-      return createCache(configuration, null, DefaultCacheManager.DEFAULT_CACHE_NAME);
+      return createCache(configuration, null, CacheContainer.DEFAULT_CACHE_NAME);
    }
 
    protected AdvancedCache<K, V> createAndWire(Configuration configuration, GlobalComponentRegistry globalComponentRegistry, String cacheName) throws Exception {

Modified: branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/manager/CacheContainer.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -54,6 +54,7 @@
  * @since 4.0
  */
 public interface CacheContainer extends Lifecycle {
+   String DEFAULT_CACHE_NAME = "___defaultcache";
 
    /**
     * Retrieves the default cache associated with this cache container.

Modified: branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/manager/DefaultCacheManager.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -106,7 +106,6 @@
 @SurvivesRestarts
 @MBean(objectName = DefaultCacheManager.OBJECT_NAME, description = "Component that acts as a manager, factory and container for caches in the system.")
 public class DefaultCacheManager implements EmbeddedCacheManager, CacheManager {
-   public static final String DEFAULT_CACHE_NAME = "___defaultcache";
    public static final String OBJECT_NAME = "CacheManager";
    private static final Log log = LogFactory.getLog(DefaultCacheManager.class);
    protected final GlobalConfiguration globalConfiguration;

Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -207,4 +207,9 @@
     *         a null otherwise.
     */
    Address getCurrentStateTransferSource();
+
+   /**
+    * Returns the address associated with this RpcManager or null if not part of the cluster.
+    */
+   public Address getAddress();
 }
\ No newline at end of file

Modified: branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -322,7 +322,7 @@
 
    @ManagedAttribute(description = "The network address associated with this instance")
    @Metric(displayName = "Network address", dataType = DataType.TRAIT, displayType = DisplayType.SUMMARY)
-   public String getAddress() {
+   public String getNodeAddress() {
       if (t == null || !isStatisticsEnabled()) return "N/A";
       Address address = t.getAddress();
       return address == null ? "N/A" : address.toString();
@@ -383,5 +383,9 @@
    // mainly for unit testing
    public void setTransport(Transport t) {
       this.t = t;
+   }@Override
+
+   public Address getAddress() {
+      return t != null ? t.getAddress() : null;
    }
 }

Modified: branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -15,6 +15,8 @@
 import org.infinispan.test.TestingUtil;
 import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.IsolationLevel;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
@@ -93,16 +95,21 @@
     * This is a separate class because some tools try and run this method as a test 
     */
    public static class RehashWaiter {
+      private static Log log = LogFactory.getLog(RehashWaiter.class);
       public static void waitForInitRehashToComplete(Cache... caches) {
          int gracetime = 60000; // 60 seconds?
          long giveup = System.currentTimeMillis() + gracetime;
          for (Cache c : caches) {
             DistributionManagerImpl dmi = (DistributionManagerImpl) TestingUtil.extractComponent(c, DistributionManager.class);
-            while (!dmi.joinComplete) {
-               if (System.currentTimeMillis() > giveup)
-                  throw new RuntimeException("Timed out waiting for initial join sequence to complete!");
+            while (!dmi.isJoinComplete()) {
+               if (System.currentTimeMillis() > giveup) {
+                  String message = "Timed out waiting for initial join sequence to complete on node " + dmi.rpcManager.getAddress() + " !";
+                  log.error(message);
+                  throw new RuntimeException(message);
+               }
                LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
             }
+            log.trace("Node " + dmi.rpcManager.getAddress() + " finished rehash task.");
          }
       }
 
@@ -151,7 +158,7 @@
          boolean allOK = true;
          for (Cache c : joiners) {
             DistributionManagerImpl dmi = (DistributionManagerImpl) getDistributionManager(c);
-            allOK &= dmi.joinComplete;
+            allOK &= dmi.isJoinComplete();
          }
          if (allOK) return;
          TestingUtil.sleepThread(100);

Modified: branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/distribution/rehash/RehashCompletedOnJoinTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -38,7 +38,7 @@
       joinerManager.defineConfiguration(cacheName, configuration);
       Cache joiner = joinerManager.getCache(cacheName);
       DistributionManager dmi = joiner.getAdvancedCache().getDistributionManager();
-      assert dmi.isJoinComplete() == true;
+      assert dmi.isJoinComplete();
    }
 
 }

Modified: branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/jmx/CacheMBeanTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -31,7 +31,6 @@
 import org.infinispan.CacheException;
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.manager.CacheContainer;
-import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.test.SingleCacheManagerTest;
 import org.infinispan.test.fwk.TestCacheManagerFactory;
@@ -53,7 +52,7 @@
    }
    
    public void testStartStopManagedOperations() throws Exception {
-      ObjectName defaultOn = new ObjectName(JMX_DOMAIN + ":cache-name=" + DefaultCacheManager.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
+      ObjectName defaultOn = new ObjectName(JMX_DOMAIN + ":cache-name=" + CacheContainer.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
       ObjectName managerON = new ObjectName(JMX_DOMAIN + ":cache-name=[global],jmx-resource=CacheManager");
       server.invoke(managerON, "startCache", new Object[]{}, new String[]{});
       assert ComponentStatus.RUNNING.toString().equals(server.getAttribute(defaultOn, "CacheStatus"));
@@ -83,7 +82,7 @@
    
    public void testManagerStopRemovesCacheMBean(Method m) throws Exception {
       final String otherJmxDomain = JMX_DOMAIN + '.' + m.getName();
-      ObjectName defaultOn = new ObjectName(otherJmxDomain + ":cache-name=" + DefaultCacheManager.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
+      ObjectName defaultOn = new ObjectName(otherJmxDomain + ":cache-name=" + CacheContainer.DEFAULT_CACHE_NAME + "(local),jmx-resource=Cache");
       ObjectName galderOn = new ObjectName(otherJmxDomain + ":cache-name=galder(local),jmx-resource=Cache");
       ObjectName managerON = new ObjectName(otherJmxDomain + ":cache-name=[global],jmx-resource=CacheManager");
       CacheContainer otherContainer = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(otherJmxDomain);

Modified: branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/manager/CacheManagerTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -21,10 +21,10 @@
 
       try {
          assert cm.getCache().getStatus() == ComponentStatus.RUNNING;
-         assert cm.getCache().getName().equals(DefaultCacheManager.DEFAULT_CACHE_NAME);
+         assert cm.getCache().getName().equals(CacheContainer.DEFAULT_CACHE_NAME);
 
          try {
-            cm.defineConfiguration(DefaultCacheManager.DEFAULT_CACHE_NAME, new Configuration());
+            cm.defineConfiguration(CacheContainer.DEFAULT_CACHE_NAME, new Configuration());
             assert false : "Should fail";
          }
          catch (IllegalArgumentException e) {

Modified: branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/java/org/infinispan/tx/ReplDeadlockDetectionTest.java	2010-07-01 07:31:37 UTC (rev 1944)
@@ -87,7 +87,7 @@
       replicationLatch = new CountDownLatch(1);
       controlledRpcManager1.setReplicationLatch(replicationLatch);
       controlledRpcManager2.setReplicationLatch(replicationLatch);
-      log.trace("_________________________ Here is beggins");
+      log.trace("_________________________ Here it begins");
    }
 
    @AfterMethod
@@ -310,5 +310,10 @@
       public Address getCurrentStateTransferSource() {
          return realOne.getCurrentStateTransferSource();
       }
+
+      @Override
+      public Address getAddress() {
+         return null;
+      }
    }
 }

Modified: branches/4.1.x/core/src/test/resources/log4j.xml
===================================================================
--- branches/4.1.x/core/src/test/resources/log4j.xml	2010-06-30 21:07:25 UTC (rev 1943)
+++ branches/4.1.x/core/src/test/resources/log4j.xml	2010-07-01 07:31:37 UTC (rev 1944)
@@ -8,7 +8,7 @@
 
    <!-- A time/date based rolling appender -->
    <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
-      <param name="File" value="infinispan.log"/>
+      <param name="File" value="infinispan_test.log"/>
       <param name="Append" value="false"/>
 
       <!-- Rollover at midnight each day -->
@@ -65,7 +65,7 @@
    <!-- ======================= -->
 
    <root>
-      <priority value="WARN"/>
+      <priority value="TRACE"/>
       <!--<appender-ref ref="CONSOLE"/>-->
       <appender-ref ref="FILE"/>
    </root>



More information about the infinispan-commits mailing list