[infinispan-commits] Infinispan SVN: r2658 - in branches/4.2.x/core/src: main/java/org/infinispan/distribution and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Nov 3 08:29:59 EDT 2010


Author: mircea.markus
Date: 2010-11-03 08:29:58 -0400 (Wed, 03 Nov 2010)
New Revision: 2658

Added:
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/TestAddress.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareChFunctionalTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareConsistentHashTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistAsyncFuncTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistSyncUnsafeFuncTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareStateTransferTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastNoRehashTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoStateTransferTest.java
Removed:
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistributionManagerImplTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java
Modified:
   branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractWheelConsistentHash.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHash.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ExperimentalDefaultConsistentHash.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
   branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/UnionConsistentHash.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
   branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
Log:
[ISPN-180]-(Colocated nodes should be handled in DIST)

Modified: branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -264,6 +264,7 @@
             ", consistentHash=" + newCH +
             ", txLogCommands=" + txLogCommands +
             ", pendingPrepares=" + pendingPrepares +
+            ", nodeTopologyInfo=" + nodeTopologyInfo +
             '}';
    }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -44,7 +44,6 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.MembershipArithmetic;
 import org.infinispan.remoting.transport.Transport;
-import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.ReclosableLatch;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -229,7 +228,6 @@
          Address leaver = MembershipArithmetic.getMemberLeft(oldMembers, newMembers);
          log.info("This is a LEAVE event!  Node {0} has just left", leaver);
 
-         topologyInfo.removeNodeInfo(leaver);
 
          try {
             if (!(consistentHash instanceof UnionConsistentHash)) {
@@ -243,8 +241,8 @@
             throw new CacheException(e);
          }
 
-         boolean willReceiveLeaverState = willReceiveLeaverState(leaver);
          List<Address> stateProviders = holdersOfLeaversState(leaver);
+         boolean willReceiveLeaverState = willReceiveLeaverState(stateProviders);
          boolean willSendLeaverState = stateProviders.contains(self);
 
          if (willReceiveLeaverState || willSendLeaverState) {
@@ -265,18 +263,42 @@
             leaveTaskFuture = rehashExecutor.submit(task);
          } else {
             log.info("Not in same subspace, so ignoring leave event");
+            topologyInfo.removeNodeInfo(leaver);
          }
       }
    }
 
    List<Address> holdersOfLeaversState(Address leaver) {
-      List<Address> addresses = oldConsistentHash.getStateProvidersOnLeave(leaver, getReplCount());
-      if (log.isTraceEnabled()) log.trace("Holders of leaver's state are: " + addresses);
-      return addresses;
+      List<Address> result = new ArrayList<Address>();
+      for (Address addr : oldConsistentHash.getCaches()) {
+         List<Address> backups = oldConsistentHash.getBackupsForNode(addr, getReplCount());
+         log.trace("Backups for {0} are {1}", addr, backups);
+         if (addr.equals(leaver)) {
+            if (backups.size() > 1) {
+               Address mainBackup = backups.get(1);
+               result.add(mainBackup);
+               log.trace("Leaver's ({0}) main backup({1}) is looking for another backup as well.", leaver, mainBackup);
+            }
+         } else if (backups.contains(leaver)) {
+            log.trace("{0} is looking for a new backup to replace {1}", addr, leaver);
+            result.add(addr);
+         }
+      }
+      log.trace("Nodes that need new backups are: {0}", result);
+      return result;
    }
 
-   boolean willReceiveLeaverState(Address leaver) {
-      return oldConsistentHash.isStateReceiverOnLeave(leaver, self, getReplCount());
+   boolean willReceiveLeaverState(List<Address> stateProviders) {
+      for (Address addr : stateProviders) {
+         List<Address> addressList = consistentHash.getBackupsForNode(addr, getReplCount());
+         boolean isLast = addressList.indexOf(self) == addressList.size() - 1;
+         if (isLast) {
+            log.trace("This is a new backup for {0}", addr);
+            return true;
+         }
+      }
+      log.trace("This node won't receive state");
+      return false;
    }
 
    public boolean isLocal(Object key) {

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -115,6 +115,7 @@
                      - start));
          else
             log.info("{0} completed leave rehash!", self);
+         for (Address addr : leaversHandled) dmi.topologyInfo.removeNodeInfo(addr);
       }
    }
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -100,10 +100,7 @@
                transactionLogger.enable();
    
                // 4.  Broadcast new temp CH
-               RehashControlCommand rehashControlCommand = cf.buildRehashControlCommand(JOIN_REHASH_START, self);
-               rehashControlCommand.setNodeTopologyInfo(dmi.topologyInfo.getNodeTopologyInfo(rpcManager.getAddress()));
-               List<Response> responseList = rpcManager.invokeRemotely(null, rehashControlCommand, true, true);
-               updateTopologyInfo(responseList);
+               broadcastNewCh();
 
                // 5.  txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
    
@@ -127,7 +124,7 @@
                dmi.drainLocalTransactionLog();
                unlocked = true;
             } else {
-               rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
+               broadcastNewCh();
                if (trace) log.trace("Rehash not enabled, so not pulling state.");
             }                                 
          } finally {
@@ -153,6 +150,13 @@
       }
    }
 
+   private void broadcastNewCh() {
+      RehashControlCommand rehashControlCommand = cf.buildRehashControlCommand(JOIN_REHASH_START, self);
+      rehashControlCommand.setNodeTopologyInfo(dmi.topologyInfo.getNodeTopologyInfo(rpcManager.getAddress()));
+      List<Response> responseList = rpcManager.invokeRemotely(null, rehashControlCommand, true, true);
+      updateTopologyInfo(responseList);
+   }
+
    private void updateTopologyInfo(List<Response> responseList) {
       for (Response r : responseList) {
          SuccessfulResponse sr = (SuccessfulResponse) r;

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -46,4 +46,12 @@
    public TopologyInfo getTopologyInfo() {
       return topologyInfo;
    }
+
+   @Override
+   public String toString() {
+      return "AbstractConsistentHash{" +
+            "caches=" + caches +
+            ", topologyInfo=" + topologyInfo +
+            '}';
+   }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractWheelConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractWheelConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/AbstractWheelConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -60,17 +60,11 @@
       }
    }
 
-   public boolean isStateReceiverOnLeave(Address leaver, Address node, int replCount) {
-      for (Address address : addresses) {
-         List<Address> backups = locate(address, replCount + 1);
-         if (backups.contains(leaver) && (backups.indexOf(node) == backups.size() - 1)) {
-            return true;
-         }
-      }
-      return false;
+   @Override
+   public List<Address> getBackupsForNode(Address node, int replCount) {
+      return locate(node, replCount);
    }
 
-
    public List<Address> getCaches() {
       return addresses;
    }
@@ -96,4 +90,13 @@
       if (keyHashCode == Integer.MIN_VALUE) keyHashCode += 1;
       return Math.abs(keyHashCode) % HASH_SPACE;
    }
+
+   @Override
+   public String toString() {
+      return "AbstractWheelConsistentHash{" +
+            "addresses=" + addresses +
+            ", positions=" + positions +
+            ", addressToHashIds=" + addressToHashIds +
+            "} " + super.toString();
+   }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -95,20 +95,14 @@
    List<Address> getStateProvidersOnLeave(Address leaver, int replCount);
 
    /**
-    * Is the specified node going to receive state if when another node leaves the cluster?
-    * When a node leaves the cluster following nodes would need to receive state as result of the rehashing:
-    *  - a new backup node for the lever to satisfy numOwners condition
-    *  - the nodes that would replace the leaver as a backup for other nodes
-    * @param leaver node that leaves
-    * @param node is this state receiver?
-    * @param replCount numOwners
-    */
-   boolean isStateReceiverOnLeave(Address leaver, Address node, int replCount);
-
-   /**
     * Returns the nodes that would act as state providers when a new node joins:
     * - the nodes for which the joiner is a backup
     * - the nodes that held joiner's state
     */
    List<Address> getStateProvidersOnJoin(Address joiner, int replCount);
+
+   /**
+    * Returns the nodes that backup data for the supplied node including the node itself.
+    */
+   List<Address> getBackupsForNode(Address node, int replCount);
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ExperimentalDefaultConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ExperimentalDefaultConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/ExperimentalDefaultConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -5,7 +5,6 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Util;
 
-import javax.naming.OperationNotSupportedException;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
@@ -382,11 +381,12 @@
       return new ArrayList<Address>(holders);
    }
 
-   public boolean isStateReceiverOnLeave(Address leaver, Address node, int replCount) {
+   public List<Address> getStateProvidersOnJoin(Address joiner, int replCount) {
       throw new RuntimeException("Not implemented!");
    }
 
-   public List<Address> getStateProvidersOnJoin(Address joiner, int replCount) {
+   @Override
+   public List<Address> getBackupsForNode(Address node, int replCount) {
       throw new RuntimeException("Not implemented!");
    }
 }
\ No newline at end of file

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -1,5 +1,6 @@
 package org.infinispan.distribution.ch;
 
+import org.infinispan.CacheException;
 import org.infinispan.marshall.Ids;
 import org.infinispan.marshall.Marshallable;
 import org.infinispan.remoting.transport.Address;
@@ -125,7 +126,11 @@
 
    private Address getOwner(Object key) {
       int hash = getNormalizedHash(key);
-      Integer ownerHash = positions.tailMap(hash).firstKey();
+      SortedMap<Integer, Address> map = positions.tailMap(hash);
+      if (map.size() == 0) {
+         return positions.get(positions.firstKey());
+      }
+      Integer ownerHash = map.firstKey();
       return positions.get(ownerHash);
    }
 

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -1,13 +1,8 @@
 package org.infinispan.distribution.ch;
 
-import org.infinispan.marshall.Externalizer;
-import org.infinispan.marshall.Ids;
-import org.infinispan.marshall.Marshallable;
+import org.infinispan.CacheException;
 import org.infinispan.remoting.transport.Address;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -23,13 +18,22 @@
    
    private Map<Address, NodeTopologyInfo> address2TopologyInfo = new HashMap<Address, NodeTopologyInfo>();
 
+   public TopologyInfo() {
+   }
+
+   public TopologyInfo(TopologyInfo topologyInfo) {
+      this.address2TopologyInfo.putAll(topologyInfo.address2TopologyInfo);
+   }
+
    public void addNodeTopologyInfo(Address addr, NodeTopologyInfo ti) {
       address2TopologyInfo.put(addr, ti);
    }
 
    public boolean isSameSite(Address a1, Address a2) {
       NodeTopologyInfo info1 = address2TopologyInfo.get(a1);
+      if (info1 == null) throw new CacheException("No such address ( " + a1 + ") in the list of caches: " + address2TopologyInfo);
       NodeTopologyInfo info2 = address2TopologyInfo.get(a2);
+      if (info2 == null) throw new CacheException("No such address ( " + a2 + ") in the list of caches: " + address2TopologyInfo);
       return info1.sameSite(info2);
    }
 
@@ -81,4 +85,8 @@
             "address2TopologyInfo=" + address2TopologyInfo +
             '}';
    }
+
+   public boolean containsInfoForNode(Address address) {
+      return address2TopologyInfo.get(address) != null;
+   }
 }

Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/UnionConsistentHash.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/UnionConsistentHash.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/ch/UnionConsistentHash.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -57,11 +57,12 @@
       throw new UnsupportedOperationException("Unsupported!");
    }
 
-   public boolean isStateReceiverOnLeave(Address leaver, Address node, int replCount) {
+   public List<Address> getStateProvidersOnJoin(Address joiner, int replCount) {
       throw new UnsupportedOperationException("Unsupported!");
    }
 
-   public List<Address> getStateProvidersOnJoin(Address joiner, int replCount) {
+   @Override
+   public List<Address> getBackupsForNode(Address node, int replCount) {
       throw new UnsupportedOperationException("Unsupported!");
    }
 

Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -65,7 +65,7 @@
       configuration.setSyncReplTimeout(60, TimeUnit.SECONDS);
       configuration.setLockAcquisitionTimeout(45, TimeUnit.SECONDS);
       configuration.setL1CacheEnabled(l1CacheEnabled);
-      if (l1CacheEnabled) configuration.setL1OnRehash(l1OnRehash);
+      if (l1CacheEnabled) configuration.setL1OnRehash(l1OnRehash);      
       caches = createClusteredCaches(INIT_CLUSTER_SIZE, cacheName, configuration);
 
       reorderBasedOnCHPositions();
@@ -266,7 +266,7 @@
    }
 
    protected static boolean isOwner(Cache<?, ?> c, Object key) {
-      DistributionManager dm = c.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class);
+      DistributionManager dm = c.getAdvancedCache().getDistributionManager();
       List<Address> ownerAddresses = dm.locate(key);
       for (Address a : ownerAddresses) {
          if (addressOf(c).equals(a)) return true;

Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/DefaultConsistentHashTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -124,50 +124,3 @@
    }
 }
 
-class TestAddress implements Address {
-   int addressNum;
-
-   String name;
-
-   public void setName(String name) {
-      this.name = name;
-   }
-
-   TestAddress(int addressNum) {
-      this.addressNum = addressNum;
-   }
-
-   public int getAddressNum() {
-      return addressNum;
-   }
-
-   public void setAddressNum(int addressNum) {
-      this.addressNum = addressNum;
-   }
-
-   @Override
-   public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      TestAddress that = (TestAddress) o;
-
-      if (addressNum != that.addressNum) return false;
-
-      return true;
-   }
-
-   @Override
-   public int hashCode() {
-      return addressNum;
-   }
-
-   @Override
-   public String toString() {
-      return "TestAddress#"+addressNum + (name != null? (" " + name) : "");
-   }
-
-   public int compareTo(Object o) {
-      return this.addressNum - ((TestAddress) o).addressNum;
-   }
-}

Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistSyncFuncTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -59,7 +59,7 @@
 
    }
 
-   public void testBasicDistribution() {
+   public void testBasicDistribution() throws Throwable {
       for (Cache<Object, String> c : caches) assert c.isEmpty();
 
       getOwners("k1")[0].put("k1", "value");

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistributionManagerImplTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistributionManagerImplTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/DistributionManagerImplTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -1,85 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.distribution.ch.DefaultConsistentHash;
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.test.AbstractInfinispanTest;
-import org.testng.annotations.BeforeTest;
-import org.testng.annotations.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test (groups = "functional", testName = "distribution.DistributionManagerImplTest")
-public class DistributionManagerImplTest extends AbstractInfinispanTest {
-   List<Address> servers;
-   DefaultConsistentHash ch;
-   Address a0;
-   Address a1;
-   Address a2;
-   Address a3;
-   Address a4;
-
-   @BeforeTest
-   public void setUp() {
-      servers = new LinkedList<Address>();
-      int numServers = 5;
-      for (int i = 0; i < numServers; i++) {
-         servers.add(new TestAddress(i));
-      }
-      ch = (DefaultConsistentHash) BaseDistFunctionalTest.createNewConsistentHash(servers);
-      a0 = ch.getAddressOnTheWheel().get(0);
-      a1 = ch.getAddressOnTheWheel().get(1);
-      a2 = ch.getAddressOnTheWheel().get(2);
-      a3 = ch.getAddressOnTheWheel().get(3);
-      a4 = ch.getAddressOnTheWheel().get(4);
-   }
-
-
-   /**
-    * numOwners = 3. Let's a a2 leaves.
-    */
-   public void testLeaver() {
-      DistributionManagerImpl dm = newDM(3);
-      dm.setSelf(a0);
-      assert dm.willReceiveLeaverState(a2) : " a0 will be the 2nd backup for a3's state";
-      dm.setSelf(a1);
-      assert !dm.willReceiveLeaverState(a2) : " a1 is not affected";
-      dm.setSelf(a3);
-      assert dm.willReceiveLeaverState(a2) : "this needs to receive state from a0";
-      dm.setSelf(a4);
-      assert dm.willReceiveLeaverState(a2) : "this needs to receive state from a1";
-   }
-
-   private DistributionManagerImpl newDM(int numOwners) {
-      DistributionManagerImpl dm = new DistributionManagerImpl();
-      Configuration configuration = new Configuration();
-      configuration.setNumOwners(numOwners);
-      dm.setConfiguration(configuration);
-      dm.setOldConsistentHash(ch);
-      return dm;
-   }
-
-
-   public void testHoldersOfLeaversState() {
-      DistributionManagerImpl dm = newDM(3);
-      dm.setSelf(a0);
-      List<Address> addressList = dm.holdersOfLeaversState(a2);
-      assert addressList.size()  == 2;
-      assert addressList.contains(a1);
-      assert addressList.contains(a3);
-   }
-
-   public void testHoldersOfLeaversState2() {
-      DistributionManagerImpl dm = newDM(2);
-      dm.setSelf(a0);
-      List<Address> addressList = dm.holdersOfLeaversState(a1);
-      assert addressList.size()  == 2;
-      assert addressList.contains(a0);
-      assert addressList.contains(a2);
-   }
-}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/TestAddress.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/TestAddress.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/TestAddress.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,55 @@
+package org.infinispan.distribution;
+
+import org.infinispan.remoting.transport.Address;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+public class TestAddress implements Address {
+   int addressNum;
+
+   String name;
+
+   public void setName(String name) {
+      this.name = name;
+   }
+
+   public TestAddress(int addressNum) {
+      this.addressNum = addressNum;
+   }
+
+   public int getAddressNum() {
+      return addressNum;
+   }
+
+   public void setAddressNum(int addressNum) {
+      this.addressNum = addressNum;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TestAddress that = (TestAddress) o;
+
+      if (addressNum != that.addressNum) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      return addressNum;
+   }
+
+   @Override
+   public String toString() {
+      return "TestAddress#"+addressNum + (name != null? (" " + name) : "");
+   }
+
+   public int compareTo(Object o) {
+      return this.addressNum - ((TestAddress) o).addressNum;
+   }
+}

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -1,528 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.distribution.ch.NodeTopologyInfo;
-import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
-import org.infinispan.distribution.ch.TopologyInfo;
-
-import org.infinispan.remoting.transport.Address;
-import org.infinispan.test.TestingUtil;
-import org.infinispan.util.logging.Log;
-import org.infinispan.util.logging.LogFactory;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test(groups = "functional", testName = "distribution.TopologyAwareConsistentHashTest")
-public class TopologyAwareConsistentHashTest {
-   
-   private static Log log = LogFactory.getLog(TopologyAwareConsistentHashTest.class);
-
-   TopologyInfo ti;
-   TopologyAwareConsistentHash ch;
-   ArrayList<Address> addresses;
-   TestAddress a0;
-   TestAddress a1;
-   TestAddress a2;
-   TestAddress a3;
-   TestAddress a4;
-   TestAddress a5;
-   TestAddress a6;
-   TestAddress a7;
-   TestAddress a8;
-   TestAddress a9;
-   private TestAddress[] staticAddresses;
-
-
-   @BeforeMethod
-   public void setUp() {
-      ti = new TopologyInfo();
-      ch = new TopologyAwareConsistentHash();
-      addresses = new ArrayList<Address>();
-      for (int i = 0; i < 10; i++) {
-          addresses.add(new TestAddress(i * 100));
-      }
-      ch.setCaches(addresses);
-      addresses = new ArrayList(ch.getCaches());
-      for (int i = 0; i < addresses.size(); i++) {
-         TestingUtil.replaceField(addresses.get(i), "a"+i, this, TopologyAwareConsistentHashTest.class);
-      }
-
-      addresses.clear();
-      ch = new TopologyAwareConsistentHash();
-      ch.setTopologyInfo(ti);
-   }
-
-   public void testIsStateReceiverOnLeave() {
-      addNode(a0, "m0", null, null);
-      addNode(a1, "m1", null, null);
-      addNode(a2, "m0", null, null);
-      addNode(a3, "m1", null, null);
-      setAddresses();
-
-      assert !ch.isStateReceiverOnLeave(a0, a1, 2);
-      assert ch.isStateReceiverOnLeave(a0, a2, 2);
-      assert !ch.isStateReceiverOnLeave(a0, a1, 2);
-   }
-
-   public void testDifferentMachines() {
-      addNode(a0, "m0", null, null);
-      addNode(a1, "m1", null, null);
-      addNode(a2, "m0", null, null);
-      addNode(a3, "m1", null, null);
-      setAddresses();
-
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-
-      assertLocation(ch.getStateProvidersOnLeave(a0, 1), false);
-      assertLocation(ch.getStateProvidersOnLeave(a1, 1), false);
-      assertLocation(ch.getStateProvidersOnLeave(a2, 1), false);
-      assertLocation(ch.getStateProvidersOnLeave(a3, 1), false);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a0);
-      
-      assertLocation(ch.getStateProvidersOnLeave(a0, 2), false, a1, a3);
-      assertLocation(ch.getStateProvidersOnLeave(a1, 2), false, a0, a2);
-      assertLocation(ch.getStateProvidersOnLeave(a2, 2), false, a3, a1);
-      assertLocation(ch.getStateProvidersOnLeave(a3, 2), false, a0, a2);
-      
-
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a0);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a1);
-      assertLocation(ch.locate(a3, 3), true, a3, a0, a2);
-
-      assertLocation(ch.getStateProvidersOnLeave(a0, 3), false, a1, a3);
-      assertLocation(ch.getStateProvidersOnLeave(a1, 3), false, a0, a2);
-      assertLocation(ch.getStateProvidersOnLeave(a2, 3), false, a3, a1);
-      assertLocation(ch.getStateProvidersOnLeave(a3, 3), false, a0, a2);
-   }
-   
-   public void testDifferentMachines2() {
-      addNode(a0, "m0", null, null);
-      addNode(a1, "m0", null, null);
-      addNode(a2, "m1", null, null);
-      addNode(a3, "m1", null, null);
-      addNode(a4, "m2", null, null);
-      addNode(a5, "m2", null, null);
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a2);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a4);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a0);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a4, a5);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a0, a1);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
-   }
-
-   public void testDifferentRacksAndMachines() {
-      addNode(a0, "m0", "r0", null);
-      addNode(a1, "m0", "r0", null);
-      addNode(a2, "m1", "r1", null);
-      addNode(a3, "m2", "r2", null);
-      addNode(a4, "m1", "r1", null);
-      addNode(a5, "m2", "r3", null);
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a2);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a5);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a5);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a5, a0);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
-   }
-
-   public void testAllSameMachine() {
-      addNode(a0, "m0", null, null);
-      addNode(a1, "m0", null, null);
-      addNode(a2, "m0", null, null);
-      addNode(a3, "m0", null, null);
-      addNode(a4, "m0", null, null);
-      addNode(a5, "m0", null, null);
-      setAddresses();
-
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a5);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a2);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a5, a0);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
-   }
-
-   public void testDifferentSites() {
-      addNode(a0, "m0", null, "s0");
-      addNode(a1, "m1", null, "s0");
-      addNode(a2, "m2", null, "s1");
-      addNode(a3, "m3", null, "s1");
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a2);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a0);
-      assertLocation(ch.locate(a3, 2), true, a3, a0);
-      
-      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a0, a1);
-      assertLocation(ch.locate(a3, 3), true, a3, a0, a1);
-   }
-
-   public void testSitesMachines2() {
-      addNode(a0, "m0", null, "s0");
-      addNode(a1, "m1", null, "s1");
-      addNode(a2, "m2", null, "s0");
-      addNode(a3, "m3", null, "s2");
-      addNode(a4, "m4", null, "s1");
-      addNode(a5, "m5", null, "s1");
-
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a0);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
-   }
-
-   public void testSitesMachinesSameMachineName() {
-      addNode(a0, "m0", null, "r0");
-      addNode(a1, "m0", null, "r1");
-      addNode(a2, "m0", null, "r0");
-      addNode(a3, "m0", null, "r2");
-      addNode(a4, "m0", null, "r1");
-      addNode(a5, "m0", null, "r1");
-
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a0);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
-   }
-
-   public void testDifferentRacks() {
-      addNode(a0, "m0", "r0", null);
-      addNode(a1, "m1", "r0", null);
-      addNode(a2, "m2", "r1", null);
-      addNode(a3, "m3", "r1", null);
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a2);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a0);
-      assertLocation(ch.locate(a3, 2), true, a3, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a0, a1);
-      assertLocation(ch.locate(a3, 3), true, a3, a0, a1);
-   }
-
-   public void testRacksMachines2() {
-      addNode(a0, "m0", "r0", null);
-      addNode(a1, "m1", "r1", null);
-      addNode(a2, "m2", "r0", null);
-      addNode(a3, "m3", "r2", null);
-      addNode(a4, "m4", "r1", null);
-      addNode(a5, "m5", "r1", null);
-
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a0);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
-   }
-
-   public void testRacksMachinesSameMachineName() {
-      addNode(a0, "m0", "r0", null);
-      addNode(a1, "m0", "r1", null);
-      addNode(a2, "m0", "r0", null);
-      addNode(a3, "m0", "r2", null);
-      addNode(a4, "m0", "r1", null);
-      addNode(a5, "m0", "r1", null);
-
-      setAddresses();
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a0);
-      assertLocation(ch.locate(a5, 2), true, a5, a0);
-
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
-      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
-   }
-
-   public void testComplexScenario() {
-      addNode(a0, "m2", "r0", "s1");
-      addNode(a1, "m1", "r0", "s0");
-      addNode(a2, "m1", "r0", "s1");
-      addNode(a3, "m1", "r1", "s0");
-      addNode(a4, "m0", "r0", "s1");
-      addNode(a5, "m0", "r1", "s1");
-      addNode(a6, "m0", "r1", "s0");
-      addNode(a7, "m0", "r0", "s3");
-      addNode(a8, "m0", "r0", "s2");
-      addNode(a9, "m0", "r0", "s0");
-      setAddresses();
-      
-      assertLocation(ch.locate(a0, 1), true, a0);
-      assertLocation(ch.locate(a1, 1), true, a1);
-      assertLocation(ch.locate(a2, 1), true, a2);
-      assertLocation(ch.locate(a3, 1), true, a3);
-      assertLocation(ch.locate(a4, 1), true, a4);
-      assertLocation(ch.locate(a5, 1), true, a5);
-      assertLocation(ch.locate(a6, 1), true, a6);
-      assertLocation(ch.locate(a7, 1), true, a7);
-      assertLocation(ch.locate(a8, 1), true, a8);
-      assertLocation(ch.locate(a9, 1), true, a9);
-
-      assertLocation(ch.locate(a0, 2), true, a0, a1);
-      assertLocation(ch.locate(a1, 2), true, a1, a2);
-      assertLocation(ch.locate(a2, 2), true, a2, a3);
-      assertLocation(ch.locate(a3, 2), true, a3, a4);
-      assertLocation(ch.locate(a4, 2), true, a4, a6);
-      assertLocation(ch.locate(a5, 2), true, a5, a6);
-      assertLocation(ch.locate(a6, 2), true, a6, a7);
-      assertLocation(ch.locate(a7, 2), true, a7, a8);
-      assertLocation(ch.locate(a8, 2), true, a8, a9);
-      assertLocation(ch.locate(a9, 2), true, a9, a0);
-
-
-      assertLocation(ch.getStateProvidersOnLeave(a0, 2), false, a1, a9);
-      assertLocation(ch.getStateProvidersOnLeave(a1, 2), false, a0, a2);
-      assertLocation(ch.getStateProvidersOnLeave(a2, 2), false, a3, a1);
-      assertLocation(ch.getStateProvidersOnLeave(a3, 2), false, a4, a2);
-      assertLocation(ch.getStateProvidersOnLeave(a4, 2), false, a6, a3);
-      assertLocation(ch.getStateProvidersOnLeave(a5, 2), false, a6);
-      assertLocation(ch.getStateProvidersOnLeave(a6, 2), false, a4, a7, a5);
-      assertLocation(ch.getStateProvidersOnLeave(a7, 2), false, a8, a6);
-      assertLocation(ch.getStateProvidersOnLeave(a8, 2), false, a9, a7);
-      assertLocation(ch.getStateProvidersOnLeave(a9, 2), false, a0, a8);
-
-      assertLocation(ch.getStateProvidersOnJoin(a0, 2), false, a1, a9);
-      assertLocation(ch.getStateProvidersOnJoin(a1, 2), false, a0, a2);
-      assertLocation(ch.getStateProvidersOnJoin(a2, 2), false, a3, a1);
-      assertLocation(ch.getStateProvidersOnJoin(a3, 2), false, a4, a2);
-      assertLocation(ch.getStateProvidersOnJoin(a4, 2), false, a6, a3);
-      assertLocation(ch.getStateProvidersOnJoin(a5, 2), false, a6);
-      assertLocation(ch.getStateProvidersOnJoin(a6, 2), false, a4, a7, a5);
-      assertLocation(ch.getStateProvidersOnJoin(a7, 2), false, a8, a6);
-      assertLocation(ch.getStateProvidersOnJoin(a8, 2), false, a9, a7);
-      assertLocation(ch.getStateProvidersOnJoin(a9, 2), false, a0, a8);
-
-      
-      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
-      assertLocation(ch.locate(a1, 3), true, a1, a2, a4);
-      assertLocation(ch.locate(a2, 3), true, a2, a3, a6);
-      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
-      assertLocation(ch.locate(a4, 3), true, a4, a6, a7);
-      assertLocation(ch.locate(a5, 3), true, a5, a6, a7);
-      assertLocation(ch.locate(a6, 3), true, a6, a7, a8);
-      assertLocation(ch.locate(a7, 3), true, a7, a8, a9);
-      assertLocation(ch.locate(a8, 3), true, a8, a9, a0);
-      assertLocation(ch.locate(a9, 3), true, a9, a0, a2);
-   }
-
-   public void testConsistencyWhenNodeLeaves() {
-      addNode(a0, "m2", "r0", "s1");
-      addNode(a1, "m1", "r0", "s0");
-      addNode(a2, "m1", "r0", "s1");
-      addNode(a3, "m1", "r1", "s0");
-      addNode(a4, "m0", "r0", "s1");
-      addNode(a5, "m0", "r1", "s1");
-      addNode(a6, "m0", "r1", "s0");
-      addNode(a7, "m0", "r0", "s3");
-      addNode(a8, "m0", "r0", "s2");
-      addNode(a9, "m0", "r0", "s0");
-      setAddresses();
-
-      List<Address> a0List = ch.locate(a0, 3);
-      List<Address> a1List = ch.locate(a1, 3);
-      List<Address> a2List = ch.locate(a2, 3);
-      List<Address> a3List = ch.locate(a3, 3);
-      List<Address> a4List = ch.locate(a4, 3);
-      List<Address> a5List = ch.locate(a5, 3);
-      List<Address> a6List = ch.locate(a6, 3);
-      List<Address> a7List = ch.locate(a7, 3);
-      List<Address> a8List = ch.locate(a8, 3);
-      List<Address> a9List = ch.locate(a9, 3);
-
-      for (Address addr: addresses) {
-         System.out.println("addr = " + addr);
-         List<Address> addressCopy = (List<Address>) addresses.clone();
-         addressCopy.remove(addr);
-         ch.setCaches(addressCopy);
-         checkConsistency(a0List, a0, addr, 3);
-         checkConsistency(a1List, a1, addr, 3);
-         checkConsistency(a2List, a2, addr, 3);
-         checkConsistency(a3List, a3, addr, 3);
-         checkConsistency(a4List, a4, addr, 3);
-         checkConsistency(a5List, a5, addr, 3);
-         checkConsistency(a6List, a6, addr, 3);
-         checkConsistency(a7List, a7, addr, 3);
-         checkConsistency(a8List, a8, addr, 3);
-         checkConsistency(a9List, a9, addr, 3);
-      }
-   }
-
-   private void checkConsistency(List<Address> a0List, TestAddress a0, Address addr, int replCount) {
-      a0List = new ArrayList(a0List);
-      a0List.remove(addr);      
-      if (a0.equals(addr)) return;
-      List<Address> currentBackupList = ch.locate(a0, replCount);
-      assertEquals(replCount, currentBackupList.size(), currentBackupList.toString());
-      assert currentBackupList.containsAll(a0List) : "Current backups are: " + currentBackupList + "Previous: " + a0List;
-   }
-
-
-   private void assertLocation(List<Address> received, boolean enforceSequence, TestAddress... expected) {
-      if (expected == null) {
-         assert received.isEmpty();
-      }
-      assertEquals(expected.length, received.size());
-      if (enforceSequence) {
-         assert received.equals(Arrays.asList(expected)) : "Received: " + received + " Expected: " + Arrays.toString(expected);
-      } else {
-         assert received.containsAll(Arrays.asList(expected)) : "Received: " + received + " Expected: " + Arrays.toString(expected);
-      }
-   }
-
-   private void addNode(TestAddress address, String machineId, String rackId, String siteId) {
-      addresses.add(address);
-      NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId, null);
-      ti.addNodeTopologyInfo(address, nti);
-   }
-
-   private void setAddresses() {
-      ch.setCaches(addresses);
-      staticAddresses = new TestAddress[]{a0, a1, a2, a3, a4, a5, a6, a7, a8, a9};
-      for (int i = 0; i < staticAddresses.length; i++) {
-         if (staticAddresses[i] != null) staticAddresses[i].setName("a" + i);
-      }
-      log.info("Static addresses: " + Arrays.toString(staticAddresses));
-   }
-
-   public TestAddress address(int hashCode) {
-      return new TestAddress(hashCode);
-   }
-}

Deleted: branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -1,83 +0,0 @@
-package org.infinispan.distribution;
-
-import org.infinispan.config.Configuration;
-import org.infinispan.config.GlobalConfiguration;
-import org.infinispan.distribution.ch.NodeTopologyInfo;
-import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
-import org.infinispan.distribution.ch.TopologyInfo;
-import org.infinispan.manager.EmbeddedCacheManager;
-import org.infinispan.test.MultipleCacheManagersTest;
-import org.infinispan.test.TestingUtil;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-
-/**
- * @author Mircea.Markus at jboss.com
- * @since 4.2
- */
- at Test(groups = "functional", testName = "distribution.TopologyInfoBroadcastTest")
-public class TopologyInfoBroadcastTest extends MultipleCacheManagersTest {
-
-   @Override
-   protected void createCacheManagers() throws Throwable {
-      addClusterEnabledCacheManagers(Configuration.CacheMode.DIST_SYNC, 3);
-      updatedSiteInfo(manager(0), "s0", "r0", "m0");
-      updatedSiteInfo(manager(1), "s1", "r1", "m1");
-      updatedSiteInfo(manager(2), "s2", "r2", "m2");
-      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
-   }
-
-   private void updatedSiteInfo(EmbeddedCacheManager embeddedCacheManager, String s, String r, String m) {
-      GlobalConfiguration gc = embeddedCacheManager.getGlobalConfiguration();
-      gc.setSiteId(s);
-      gc.setRackId(r);
-      gc.setMachineId(m);
-   }
-
-   public void testIsReplicated() {
-      assert advancedCache(0).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
-      assert advancedCache(1).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
-      assert advancedCache(2).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
-
-      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
-      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
-      dmi = (DistributionManagerImpl) advancedCache(1).getDistributionManager();
-      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
-      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
-      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
-
-      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
-      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
-      tach = (TopologyAwareConsistentHash) advancedCache(1).getDistributionManager().getConsistentHash();
-      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
-      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
-      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
-   }
-
-   @Test(dependsOnMethods = "testIsReplicated")
-   public void testNodeLeaves() {
-      TestingUtil.killCacheManagers(manager(1));
-      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(2));
-
-      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
-      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
-      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
-      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
-
-      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
-      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
-      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
-      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
-   }
-
-   private void assertTopologyInfo3Nodes(TopologyInfo topologyInfo) {
-      assertTopologyInfo2Nodes(topologyInfo);
-      assertEquals(topologyInfo.getNodeTopologyInfo(address(1)), new NodeTopologyInfo("m1","r1", "s1", address(1)));
-   }
-
-   private void assertTopologyInfo2Nodes(TopologyInfo topologyInfo) {
-      assertEquals(topologyInfo.getNodeTopologyInfo(address(0)), new NodeTopologyInfo("m0","r0", "s0", address(0)));
-      assertEquals(topologyInfo.getNodeTopologyInfo(address(2)), new NodeTopologyInfo("m2","r2", "s2", address(2)));
-   }
-}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareChFunctionalTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareChFunctionalTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareChFunctionalTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,75 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.DistSyncFuncTest;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test (groups = "functional", testName = "distribution.TopologyAwareChFunctionalTest")
+public class TopologyAwareChFunctionalTest extends DistSyncFuncTest {
+
+   @Override
+   protected EmbeddedCacheManager addClusterEnabledCacheManager() {
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager();
+      int index = cacheManagers.size();
+      String rack;
+      String machine;
+      switch (index) {
+         case 0 : {
+            rack = "r0";
+            machine = "m0";
+            break;
+         }
+         case 1 : {
+            rack = "r0";
+            machine = "m1";
+            break;
+         }
+         case 2 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         case 3 : {
+            rack = "r2";
+            machine = "m0";
+            break;
+         }
+         default : {
+            throw new RuntimeException("Bad!");
+         }
+      }
+      GlobalConfiguration globalConfiguration = cm.getGlobalConfiguration();      
+      globalConfiguration.setRackId(rack);
+      globalConfiguration.setMachineId(machine);
+      cacheManagers.add(cm);
+      return cm;
+   }
+
+   public void testHashesInitiated() {
+      TopologyAwareConsistentHash hash = (TopologyAwareConsistentHash) advancedCache(0, cacheName).getDistributionManager().getConsistentHash();
+      containsAllHashes(hash);
+      containsAllHashes((TopologyAwareConsistentHash) advancedCache(1, cacheName).getDistributionManager().getConsistentHash());
+      containsAllHashes((TopologyAwareConsistentHash) advancedCache(2, cacheName).getDistributionManager().getConsistentHash());
+      containsAllHashes((TopologyAwareConsistentHash) advancedCache(3, cacheName).getDistributionManager().getConsistentHash());
+   }
+
+   private void containsAllHashes(TopologyAwareConsistentHash ch) {
+      assert ch.getCaches().contains(address(0));
+      assert ch.getCaches().contains(address(1));
+      assert ch.getCaches().contains(address(2));
+      assert ch.getCaches().contains(address(3));
+      TopologyInfo topologyInfo = ch.getTopologyInfo();
+      assert topologyInfo.containsInfoForNode(address(0)) : topologyInfo;
+      assert topologyInfo.containsInfoForNode(address(1)) : topologyInfo;
+      assert topologyInfo.containsInfoForNode(address(2)) : topologyInfo;
+      assert topologyInfo.containsInfoForNode(address(3)) : topologyInfo;
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareConsistentHashTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareConsistentHashTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareConsistentHashTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,517 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.distribution.TestAddress;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
+
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "distribution.TopologyAwareConsistentHashTest")
+public class TopologyAwareConsistentHashTest {
+   
+   private static Log log = LogFactory.getLog(TopologyAwareConsistentHashTest.class);
+
+   TopologyInfo ti;
+   TopologyAwareConsistentHash ch;
+   ArrayList<Address> addresses;
+   TestAddress a0;
+   TestAddress a1;
+   TestAddress a2;
+   TestAddress a3;
+   TestAddress a4;
+   TestAddress a5;
+   TestAddress a6;
+   TestAddress a7;
+   TestAddress a8;
+   TestAddress a9;
+   private TestAddress[] staticAddresses;
+
+
+   @BeforeMethod
+   public void setUp() {
+      ti = new TopologyInfo();
+      ch = new TopologyAwareConsistentHash();
+      addresses = new ArrayList<Address>();
+      for (int i = 0; i < 10; i++) {
+          addresses.add(new TestAddress(i * 100));
+      }
+      ch.setCaches(addresses);
+      addresses = new ArrayList(ch.getCaches());
+      for (int i = 0; i < addresses.size(); i++) {
+         TestingUtil.replaceField(addresses.get(i), "a"+i, this, TopologyAwareConsistentHashTest.class);
+      }
+
+      addresses.clear();
+      ch = new TopologyAwareConsistentHash();
+      ch.setTopologyInfo(ti);
+   }
+
+   public void testDifferentMachines() {
+      addNode(a0, "m0", null, null);
+      addNode(a1, "m1", null, null);
+      addNode(a2, "m0", null, null);
+      addNode(a3, "m1", null, null);
+      setAddresses();
+
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+
+      assertLocation(ch.getStateProvidersOnLeave(a0, 1), false);
+      assertLocation(ch.getStateProvidersOnLeave(a1, 1), false);
+      assertLocation(ch.getStateProvidersOnLeave(a2, 1), false);
+      assertLocation(ch.getStateProvidersOnLeave(a3, 1), false);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a0);
+      
+      assertLocation(ch.getStateProvidersOnLeave(a0, 2), false, a1, a3);
+      assertLocation(ch.getStateProvidersOnLeave(a1, 2), false, a0, a2);
+      assertLocation(ch.getStateProvidersOnLeave(a2, 2), false, a3, a1);
+      assertLocation(ch.getStateProvidersOnLeave(a3, 2), false, a0, a2);
+      
+
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a0);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a1);
+      assertLocation(ch.locate(a3, 3), true, a3, a0, a2);
+
+      assertLocation(ch.getStateProvidersOnLeave(a0, 3), false, a1, a3);
+      assertLocation(ch.getStateProvidersOnLeave(a1, 3), false, a0, a2);
+      assertLocation(ch.getStateProvidersOnLeave(a2, 3), false, a3, a1);
+      assertLocation(ch.getStateProvidersOnLeave(a3, 3), false, a0, a2);
+   }
+   
+   public void testDifferentMachines2() {
+      addNode(a0, "m0", null, null);
+      addNode(a1, "m0", null, null);
+      addNode(a2, "m1", null, null);
+      addNode(a3, "m1", null, null);
+      addNode(a4, "m2", null, null);
+      addNode(a5, "m2", null, null);
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a2);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a4);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a0);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a4, a5);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a0, a1);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
+   }
+
+   public void testDifferentRacksAndMachines() {
+      addNode(a0, "m0", "r0", null);
+      addNode(a1, "m0", "r0", null);
+      addNode(a2, "m1", "r1", null);
+      addNode(a3, "m2", "r2", null);
+      addNode(a4, "m1", "r1", null);
+      addNode(a5, "m2", "r3", null);
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a2);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a5);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a5);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a5, a0);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
+   }
+
+   public void testAllSameMachine() {
+      addNode(a0, "m0", null, null);
+      addNode(a1, "m0", null, null);
+      addNode(a2, "m0", null, null);
+      addNode(a3, "m0", null, null);
+      addNode(a4, "m0", null, null);
+      addNode(a5, "m0", null, null);
+      setAddresses();
+
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a5);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a2);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a5, a0);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a1);
+   }
+
+   public void testDifferentSites() {
+      addNode(a0, "m0", null, "s0");
+      addNode(a1, "m1", null, "s0");
+      addNode(a2, "m2", null, "s1");
+      addNode(a3, "m3", null, "s1");
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a2);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a0);
+      assertLocation(ch.locate(a3, 2), true, a3, a0);
+      
+      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a0, a1);
+      assertLocation(ch.locate(a3, 3), true, a3, a0, a1);
+   }
+
+   public void testSitesMachines2() {
+      addNode(a0, "m0", null, "s0");
+      addNode(a1, "m1", null, "s1");
+      addNode(a2, "m2", null, "s0");
+      addNode(a3, "m3", null, "s2");
+      addNode(a4, "m4", null, "s1");
+      addNode(a5, "m5", null, "s1");
+
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a0);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
+   }
+
+   public void testSitesMachinesSameMachineName() {
+      addNode(a0, "m0", null, "r0");
+      addNode(a1, "m0", null, "r1");
+      addNode(a2, "m0", null, "r0");
+      addNode(a3, "m0", null, "r2");
+      addNode(a4, "m0", null, "r1");
+      addNode(a5, "m0", null, "r1");
+
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a0);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
+   }
+
+   public void testDifferentRacks() {
+      addNode(a0, "m0", "r0", null);
+      addNode(a1, "m1", "r0", null);
+      addNode(a2, "m2", "r1", null);
+      addNode(a3, "m3", "r1", null);
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a2);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a0);
+      assertLocation(ch.locate(a3, 2), true, a3, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a2, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a0, a1);
+      assertLocation(ch.locate(a3, 3), true, a3, a0, a1);
+   }
+
+   public void testRacksMachines2() {
+      addNode(a0, "m0", "r0", null);
+      addNode(a1, "m1", "r1", null);
+      addNode(a2, "m2", "r0", null);
+      addNode(a3, "m3", "r2", null);
+      addNode(a4, "m4", "r1", null);
+      addNode(a5, "m5", "r1", null);
+
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a0);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
+   }
+
+   public void testRacksMachinesSameMachineName() {
+      addNode(a0, "m0", "r0", null);
+      addNode(a1, "m0", "r1", null);
+      addNode(a2, "m0", "r0", null);
+      addNode(a3, "m0", "r2", null);
+      addNode(a4, "m0", "r1", null);
+      addNode(a5, "m0", "r1", null);
+
+      setAddresses();
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a0);
+      assertLocation(ch.locate(a5, 2), true, a5, a0);
+
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a3);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a4);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a0, a2);
+      assertLocation(ch.locate(a5, 3), true, a5, a0, a2);
+   }
+
+   public void testComplexScenario() {
+      addNode(a0, "m2", "r0", "s1");
+      addNode(a1, "m1", "r0", "s0");
+      addNode(a2, "m1", "r0", "s1");
+      addNode(a3, "m1", "r1", "s0");
+      addNode(a4, "m0", "r0", "s1");
+      addNode(a5, "m0", "r1", "s1");
+      addNode(a6, "m0", "r1", "s0");
+      addNode(a7, "m0", "r0", "s3");
+      addNode(a8, "m0", "r0", "s2");
+      addNode(a9, "m0", "r0", "s0");
+      setAddresses();
+      
+      assertLocation(ch.locate(a0, 1), true, a0);
+      assertLocation(ch.locate(a1, 1), true, a1);
+      assertLocation(ch.locate(a2, 1), true, a2);
+      assertLocation(ch.locate(a3, 1), true, a3);
+      assertLocation(ch.locate(a4, 1), true, a4);
+      assertLocation(ch.locate(a5, 1), true, a5);
+      assertLocation(ch.locate(a6, 1), true, a6);
+      assertLocation(ch.locate(a7, 1), true, a7);
+      assertLocation(ch.locate(a8, 1), true, a8);
+      assertLocation(ch.locate(a9, 1), true, a9);
+
+      assertLocation(ch.locate(a0, 2), true, a0, a1);
+      assertLocation(ch.locate(a1, 2), true, a1, a2);
+      assertLocation(ch.locate(a2, 2), true, a2, a3);
+      assertLocation(ch.locate(a3, 2), true, a3, a4);
+      assertLocation(ch.locate(a4, 2), true, a4, a6);
+      assertLocation(ch.locate(a5, 2), true, a5, a6);
+      assertLocation(ch.locate(a6, 2), true, a6, a7);
+      assertLocation(ch.locate(a7, 2), true, a7, a8);
+      assertLocation(ch.locate(a8, 2), true, a8, a9);
+      assertLocation(ch.locate(a9, 2), true, a9, a0);
+
+
+      assertLocation(ch.getStateProvidersOnLeave(a0, 2), false, a1, a9);
+      assertLocation(ch.getStateProvidersOnLeave(a1, 2), false, a0, a2);
+      assertLocation(ch.getStateProvidersOnLeave(a2, 2), false, a3, a1);
+      assertLocation(ch.getStateProvidersOnLeave(a3, 2), false, a4, a2);
+      assertLocation(ch.getStateProvidersOnLeave(a4, 2), false, a6, a3);
+      assertLocation(ch.getStateProvidersOnLeave(a5, 2), false, a6);
+      assertLocation(ch.getStateProvidersOnLeave(a6, 2), false, a4, a7, a5);
+      assertLocation(ch.getStateProvidersOnLeave(a7, 2), false, a8, a6);
+      assertLocation(ch.getStateProvidersOnLeave(a8, 2), false, a9, a7);
+      assertLocation(ch.getStateProvidersOnLeave(a9, 2), false, a0, a8);
+
+      assertLocation(ch.getStateProvidersOnJoin(a0, 2), false, a1, a9);
+      assertLocation(ch.getStateProvidersOnJoin(a1, 2), false, a0, a2);
+      assertLocation(ch.getStateProvidersOnJoin(a2, 2), false, a3, a1);
+      assertLocation(ch.getStateProvidersOnJoin(a3, 2), false, a4, a2);
+      assertLocation(ch.getStateProvidersOnJoin(a4, 2), false, a6, a3);
+      assertLocation(ch.getStateProvidersOnJoin(a5, 2), false, a6);
+      assertLocation(ch.getStateProvidersOnJoin(a6, 2), false, a4, a7, a5);
+      assertLocation(ch.getStateProvidersOnJoin(a7, 2), false, a8, a6);
+      assertLocation(ch.getStateProvidersOnJoin(a8, 2), false, a9, a7);
+      assertLocation(ch.getStateProvidersOnJoin(a9, 2), false, a0, a8);
+
+      
+      assertLocation(ch.locate(a0, 3), true, a0, a1, a3);
+      assertLocation(ch.locate(a1, 3), true, a1, a2, a4);
+      assertLocation(ch.locate(a2, 3), true, a2, a3, a6);
+      assertLocation(ch.locate(a3, 3), true, a3, a4, a5);
+      assertLocation(ch.locate(a4, 3), true, a4, a6, a7);
+      assertLocation(ch.locate(a5, 3), true, a5, a6, a7);
+      assertLocation(ch.locate(a6, 3), true, a6, a7, a8);
+      assertLocation(ch.locate(a7, 3), true, a7, a8, a9);
+      assertLocation(ch.locate(a8, 3), true, a8, a9, a0);
+      assertLocation(ch.locate(a9, 3), true, a9, a0, a2);
+   }
+
+   public void testConsistencyWhenNodeLeaves() {
+      addNode(a0, "m2", "r0", "s1");
+      addNode(a1, "m1", "r0", "s0");
+      addNode(a2, "m1", "r0", "s1");
+      addNode(a3, "m1", "r1", "s0");
+      addNode(a4, "m0", "r0", "s1");
+      addNode(a5, "m0", "r1", "s1");
+      addNode(a6, "m0", "r1", "s0");
+      addNode(a7, "m0", "r0", "s3");
+      addNode(a8, "m0", "r0", "s2");
+      addNode(a9, "m0", "r0", "s0");
+      setAddresses();
+
+      List<Address> a0List = ch.locate(a0, 3);
+      List<Address> a1List = ch.locate(a1, 3);
+      List<Address> a2List = ch.locate(a2, 3);
+      List<Address> a3List = ch.locate(a3, 3);
+      List<Address> a4List = ch.locate(a4, 3);
+      List<Address> a5List = ch.locate(a5, 3);
+      List<Address> a6List = ch.locate(a6, 3);
+      List<Address> a7List = ch.locate(a7, 3);
+      List<Address> a8List = ch.locate(a8, 3);
+      List<Address> a9List = ch.locate(a9, 3);
+
+      for (Address addr: addresses) {
+         System.out.println("addr = " + addr);
+         List<Address> addressCopy = (List<Address>) addresses.clone();
+         addressCopy.remove(addr);
+         ch.setCaches(addressCopy);
+         checkConsistency(a0List, a0, addr, 3);
+         checkConsistency(a1List, a1, addr, 3);
+         checkConsistency(a2List, a2, addr, 3);
+         checkConsistency(a3List, a3, addr, 3);
+         checkConsistency(a4List, a4, addr, 3);
+         checkConsistency(a5List, a5, addr, 3);
+         checkConsistency(a6List, a6, addr, 3);
+         checkConsistency(a7List, a7, addr, 3);
+         checkConsistency(a8List, a8, addr, 3);
+         checkConsistency(a9List, a9, addr, 3);
+      }
+   }
+
+   private void checkConsistency(List<Address> a0List, TestAddress a0, Address addr, int replCount) {
+      a0List = new ArrayList(a0List);
+      a0List.remove(addr);      
+      if (a0.equals(addr)) return;
+      List<Address> currentBackupList = ch.locate(a0, replCount);
+      assertEquals(replCount, currentBackupList.size(), currentBackupList.toString());
+      assert currentBackupList.containsAll(a0List) : "Current backups are: " + currentBackupList + "Previous: " + a0List;
+   }
+
+
+   private void assertLocation(List<Address> received, boolean enforceSequence, TestAddress... expected) {
+      if (expected == null) {
+         assert received.isEmpty();
+      }
+      assertEquals(expected.length, received.size());
+      if (enforceSequence) {
+         assert received.equals(Arrays.asList(expected)) : "Received: " + received + " Expected: " + Arrays.toString(expected);
+      } else {
+         assert received.containsAll(Arrays.asList(expected)) : "Received: " + received + " Expected: " + Arrays.toString(expected);
+      }
+   }
+
+   private void addNode(TestAddress address, String machineId, String rackId, String siteId) {
+      addresses.add(address);
+      NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId, null);
+      ti.addNodeTopologyInfo(address, nti);
+   }
+
+   private void setAddresses() {
+      ch.setCaches(addresses);
+      staticAddresses = new TestAddress[]{a0, a1, a2, a3, a4, a5, a6, a7, a8, a9};
+      for (int i = 0; i < staticAddresses.length; i++) {
+         if (staticAddresses[i] != null) staticAddresses[i].setName("a" + i);
+      }
+      log.info("Static addresses: " + Arrays.toString(staticAddresses));
+   }
+
+   public TestAddress address(int hashCode) {
+      return new TestAddress(hashCode);
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistAsyncFuncTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistAsyncFuncTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistAsyncFuncTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,53 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.DistAsyncFuncTest;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test (groups = "functional", testName = "topologyaware.TopologyAwareDistAsyncFuncTest")
+public class TopologyAwareDistAsyncFuncTest extends DistAsyncFuncTest {
+
+   @Override
+   protected EmbeddedCacheManager addClusterEnabledCacheManager() {
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager();
+      int index = cacheManagers.size();
+      String rack;
+      String machine;
+      switch (index) {
+         case 0 : {
+            rack = "r0";
+            machine = "m0";
+            break;
+         }
+         case 1 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         case 2 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         case 3 : {
+            rack = "r1";
+            machine = "m1";
+            break;
+         }
+         default : {
+            throw new RuntimeException("Bad!");
+         }
+      }
+      GlobalConfiguration globalConfiguration = cm.getGlobalConfiguration();
+      globalConfiguration.setRackId(rack);
+      globalConfiguration.setMachineId(machine);
+      cacheManagers.add(cm);
+      return cm;
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistSyncUnsafeFuncTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistSyncUnsafeFuncTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareDistSyncUnsafeFuncTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,54 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.DistSyncUnsafeFuncTest;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.Test;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(testName="topologyaware.TopologyAwareDistSyncUnsafeFuncTest", groups = "functional")
+public class TopologyAwareDistSyncUnsafeFuncTest extends DistSyncUnsafeFuncTest {
+
+   @Override
+   protected EmbeddedCacheManager addClusterEnabledCacheManager() {
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager();
+      int index = cacheManagers.size();
+      String rack;
+      String machine;
+      switch (index) {
+         case 0 : {
+            rack = "r0";
+            machine = "m0";
+            break;
+         }
+         case 1 : {
+            rack = "r0";
+            machine = "m0";
+            break;
+         }
+         case 2 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         case 3 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         default : {
+            throw new RuntimeException("Bad!");
+         }
+      }
+      GlobalConfiguration globalConfiguration = cm.getGlobalConfiguration();
+      globalConfiguration.setRackId(rack);
+      globalConfiguration.setMachineId(machine);
+      cacheManagers.add(cm);
+      return cm;
+   }
+
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareStateTransferTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareStateTransferTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyAwareStateTransferTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,220 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Address;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "topologyaware.TopologyAwareStateTransferTest")
+public class TopologyAwareStateTransferTest extends MultipleCacheManagersTest {
+
+   private Address a0;
+   private Address a1;
+   private Address a2;
+   private Address a3;
+   private Address a4;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration defaultConfig = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+      System.out.println("defaultConfig = " + defaultConfig.getNumOwners());
+      defaultConfig.setL1CacheEnabled(false);
+      createClusteredCaches(5, defaultConfig);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2), cache(3), cache(4));
+
+      TopologyAwareConsistentHash hash =
+            (TopologyAwareConsistentHash) cache(0).getAdvancedCache().getDistributionManager().getConsistentHash();
+      List<Address> addressList = hash.getCaches();
+      System.out.println("addressList = " + addressList);
+      a0 = addressList.get(0);
+      a1 = addressList.get(1);
+      a2 = addressList.get(2);
+      a3 = addressList.get(3);
+      a4 = addressList.get(4);
+      printTopologyInfo("a0", cache(a0).getConfiguration().getGlobalConfiguration());
+      printTopologyInfo("a1", cache(a1).getConfiguration().getGlobalConfiguration());
+      printTopologyInfo("a2", cache(a2).getConfiguration().getGlobalConfiguration());
+      printTopologyInfo("a3", cache(a3).getConfiguration().getGlobalConfiguration());
+      printTopologyInfo("a4", cache(a4).getConfiguration().getGlobalConfiguration());
+   }
+
+   private void printTopologyInfo(String str, GlobalConfiguration gc) {
+      System.out.println(str + ": " + gc.getRackId() + "->" + gc.getMachineId());
+   }
+
+   @AfterMethod
+   @Override
+   protected void clearContent() throws Throwable {      
+   }
+
+   Cache cache(Address addr) {
+      for (Cache c : caches()) {
+         if (c.getAdvancedCache().getRpcManager().getAddress().equals(addr)) return c;
+      }
+      throw new RuntimeException("Address: " + addr);
+   }
+
+   public void testInitialState() {
+      cache(0).put(a0,"v0");
+      cache(0).put(a1,"v0");
+      cache(0).put(a2,"v0");
+      cache(0).put(a3,"v0");
+      cache(0).put(a4,"v0");
+      assertExistence(a0);
+      assertExistence(a1);
+      assertExistence(a2);
+      assertExistence(a3);
+      assertExistence(a4);
+   }
+
+   @Test (dependsOnMethods = "testInitialState")
+   public void testNodeDown() {
+      EmbeddedCacheManager cm = (EmbeddedCacheManager) cache(a4).getCacheManager();
+      log.info("Here is where ST starts");
+      TestingUtil.killCacheManagers(cm);
+      cacheManagers.remove(cm);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(a0), cache(a1), cache(a2), cache(a3));
+      log.info("Here is where ST ends");
+      List<Address> addressList = cache(a0).getAdvancedCache().getDistributionManager().getConsistentHash().getCaches();
+      System.out.println("After shutting down " + a4 + " caches are " +  addressList);
+
+
+      System.out.println(TestingUtil.printCache(cache(a0)));
+      System.out.println(TestingUtil.printCache(cache(a1)));
+      System.out.println(TestingUtil.printCache(cache(a2)));
+      System.out.println(TestingUtil.printCache(cache(a3)));
+
+      assertExistence(a0);
+      assertExistence(a1);
+      assertExistence(a2);
+      assertExistence(a3);
+      assertExistence(a4);      
+   }
+
+   @Test (dependsOnMethods = "testNodeDown")
+   public void testNodeDown2() {
+      EmbeddedCacheManager cm = (EmbeddedCacheManager) cache(a2).getCacheManager();
+      TestingUtil.killCacheManagers(cm);
+      cacheManagers.remove(cm);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(a0), cache(a1), cache(a3));
+      assertExistence(a0);
+      assertExistence(a1);
+      assertExistence(a2);
+      assertExistence(a3);
+      assertExistence(a4);
+   }
+
+   @Test (dependsOnMethods = "testNodeDown2")
+   public void testNodeDown3() {
+      EmbeddedCacheManager cm = (EmbeddedCacheManager) cache(a1).getCacheManager();
+      TestingUtil.killCacheManagers(cm);
+      cacheManagers.remove(cm);
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(a0), cache(a3));
+      assertExistence(a0);
+      assertExistence(a1);
+      assertExistence(a2);
+      assertExistence(a3);
+      assertExistence(a4);
+   }
+
+
+   private void assertExistence(final Object key) {
+      ConsistentHash hash = cache(a0).getAdvancedCache().getDistributionManager().getConsistentHash();
+      final List<Address> addresses = hash.locate(key, 2);
+      System.out.println(key + " should be present on = " + addresses);
+      log.info(key + " should be present on = " + addresses);
+
+      eventually(new Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            int count = 0;
+            for (Cache c : caches()) {
+               if (c.getAdvancedCache().getDataContainer().containsKey(key)) {
+                  System.out.println("It is here = " + address(c));
+                  count++;
+               }
+            }
+            System.out.println("count = " + count);
+            return count == 2;
+         }         
+      });
+
+      eventually(new Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            for (Cache c : caches()) {
+               if (addresses.contains(address(c))) {
+                  if (!c.getAdvancedCache().getDataContainer().containsKey(key)) {
+                     System.out.println(key + " not present on " + c.getAdvancedCache().getRpcManager().getAddress());
+                     return false;
+                  }
+               } else {
+                  if (c.getAdvancedCache().getDataContainer().containsKey(key)) {
+                     System.out.println(key + " present on " + c.getAdvancedCache().getRpcManager().getAddress());
+                     return false;
+                  }
+               }
+            }
+            return true;
+         }
+      });
+   }
+
+   @Override
+   protected EmbeddedCacheManager addClusterEnabledCacheManager(Configuration deConfiguration) {
+      EmbeddedCacheManager cm = TestCacheManagerFactory.createClusteredCacheManager(deConfiguration);
+      int index = cacheManagers.size();
+      String rack;
+      String machine;
+      switch (index) {
+         case 0 : {
+            rack = "r0";
+            machine = "m0";
+            break;
+         }
+         case 1 : {
+            rack = "r0";
+            machine = "m1";
+            break;
+         }
+         case 2 : {
+            rack = "r1";
+            machine = "m0";
+            break;
+         }
+         case 3 : {
+            rack = "r2";
+            machine = "m0";
+            break;
+         }
+         case 4 : {
+            rack = "r2";
+            machine = "m0";
+            break;
+         }
+         default : {
+            throw new RuntimeException("Bad!");
+         }
+      }
+      GlobalConfiguration globalConfiguration = cm.getGlobalConfiguration();
+      globalConfiguration.setRackId(rack);
+      globalConfiguration.setMachineId(machine);
+      cacheManagers.add(cm);
+      return cm;
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastNoRehashTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastNoRehashTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastNoRehashTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,19 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.config.Configuration;
+import org.testng.annotations.Test;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "distribution.TopologyInfoBroadcastNoRehashTest")
+public class TopologyInfoBroadcastNoRehashTest extends TopologyInfoBroadcastTest {
+
+   @Override
+   protected Configuration getClusterConfig() {
+      Configuration configuration = super.getClusterConfig();
+      configuration.setRehashEnabled(false);
+      return configuration;
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoBroadcastTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,92 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.BaseDistFunctionalTest;
+import org.infinispan.distribution.DistributionManagerImpl;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "distribution.TopologyInfoBroadcastTest")
+public class TopologyInfoBroadcastTest extends MultipleCacheManagersTest {
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      addClusterEnabledCacheManagers(getClusterConfig(), 3);
+      updatedSiteInfo(manager(0), "s0", "r0", "m0");
+      updatedSiteInfo(manager(1), "s1", "r1", "m1");
+      updatedSiteInfo(manager(2), "s2", "r2", "m2");
+      log.info("Here it starts");
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+      log.info("Here it ends");
+   }
+
+   protected Configuration getClusterConfig() {
+      return getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
+   }
+
+   private void updatedSiteInfo(EmbeddedCacheManager embeddedCacheManager, String s, String r, String m) {
+      GlobalConfiguration gc = embeddedCacheManager.getGlobalConfiguration();
+      gc.setSiteId(s);
+      gc.setRackId(r);
+      gc.setMachineId(m);
+   }
+
+   public void testIsReplicated() {
+      assert advancedCache(0).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+      assert advancedCache(1).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+      assert advancedCache(2).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+
+      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+      System.out.println("dmi.getTopologyInfo() = " + dmi.getTopologyInfo());
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(1).getDistributionManager();
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+
+      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(1).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+   }
+
+   @Test(dependsOnMethods = "testIsReplicated")
+   public void testNodeLeaves() {
+      TestingUtil.killCacheManagers(manager(1));
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(2));
+
+      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+
+      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+   }
+
+   private void assertTopologyInfo3Nodes(TopologyInfo topologyInfo) {
+      assertTopologyInfo2Nodes(topologyInfo);
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(1)), new NodeTopologyInfo("m1","r1", "s1", address(1)));
+   }
+
+   private void assertTopologyInfo2Nodes(TopologyInfo topologyInfo) {
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(0)), new NodeTopologyInfo("m0","r0", "s0", address(0)));
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(2)), new NodeTopologyInfo("m2","r2", "s2", address(2)));
+   }
+}

Added: branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoStateTransferTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoStateTransferTest.java	                        (rev 0)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/topologyaware/TopologyInfoStateTransferTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -0,0 +1,19 @@
+package org.infinispan.distribution.topologyaware;
+
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.Test;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(testName = "topologyaware.TopologyInfoStateTransferTest", groups = "functional")
+public class TopologyInfoStateTransferTest extends MultipleCacheManagersTest {
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      // TODO: Customise this generated block
+   }
+}

Modified: branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-11-03 09:13:35 UTC (rev 2657)
+++ branches/4.2.x/core/src/test/java/org/infinispan/test/MultipleCacheManagersTest.java	2010-11-03 12:29:58 UTC (rev 2658)
@@ -290,4 +290,21 @@
    public AdvancedCache advancedCache(int i, String cacheName) {
       return cache(i, cacheName).getAdvancedCache();
    }
+
+   public List<Cache> caches(String name) {
+      List<Cache> result = new ArrayList<Cache>();
+      for (EmbeddedCacheManager ecm : cacheManagers) {
+         result.add(name == null? ecm.getCache() : ecm.getCache(name));
+      }
+      return result;
+   }
+
+   public List<Cache> caches() {
+      return caches(null);
+   }
+
+   protected Address address(Cache c) {
+      return c.getAdvancedCache().getRpcManager().getAddress();
+   }
+
 }



More information about the infinispan-commits mailing list