[infinispan-commits] Infinispan SVN: r2630 - in trunk/core/src: main/java/org/infinispan/config and 12 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Oct 28 15:50:06 EDT 2010


Author: mircea.markus
Date: 2010-10-28 15:50:05 -0400 (Thu, 28 Oct 2010)
New Revision: 2630

Added:
   trunk/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java
Modified:
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/config/Configuration.java
   trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
   trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
   trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
   trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
   trunk/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java
   trunk/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java
   trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
   trunk/core/src/main/java/org/infinispan/marshall/Ids.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
   trunk/core/src/main/resources/config-samples/all.xml
   trunk/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
   trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
   trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
   trunk/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java
   trunk/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
   trunk/core/src/test/resources/configs/named-cache-test.xml
Log:
[ISPN-180]-ongoing work

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -13,6 +13,7 @@
 import org.infinispan.context.InvocationContext;
 import org.infinispan.distribution.ch.ConsistentHash;
 import org.infinispan.distribution.DistributionManager;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.marshall.Ids;
@@ -63,6 +64,7 @@
    List<WriteCommand> txLogCommands;
    List<PrepareCommand> pendingPrepares;
    CommandsFactory commandsFactory;
+   NodeTopologyInfo nodeTopologyInfo;
    private static final Log log = LogFactory.getLog(RehashControlCommand.class);
 
    public RehashControlCommand() {
@@ -115,10 +117,9 @@
          case JOIN_REQ:
             return distributionManager.requestPermissionToJoin(sender);
          case JOIN_REHASH_START:
-            distributionManager.informRehashOnJoin(sender, true);
-            return null;
+            return distributionManager.informRehashOnJoin(sender, true, nodeTopologyInfo);
          case JOIN_REHASH_END:
-            distributionManager.informRehashOnJoin(sender, false);
+            distributionManager.informRehashOnJoin(sender, false, nodeTopologyInfo);
             return null;
          case PULL_STATE_JOIN:
             return pullStateForJoin();             
@@ -230,9 +231,13 @@
    }
 
    public Object[] getParameters() {
-      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares};
+      return new Object[]{cacheName, (byte) type.ordinal(), sender, state, oldCH, nodesLeft, newCH, txLogCommands, pendingPrepares, nodeTopologyInfo};
    }
 
+   public void setNodeTopologyInfo(NodeTopologyInfo nodeTopologyInfo) {
+      this.nodeTopologyInfo = nodeTopologyInfo;
+   }
+
    @SuppressWarnings("unchecked")
    public void setParameters(int commandId, Object[] parameters) {
       int i = 0;
@@ -245,6 +250,7 @@
       newCH = (ConsistentHash) parameters[i++];
       txLogCommands = (List<WriteCommand>) parameters[i++];
       pendingPrepares = (List<PrepareCommand>) parameters[i++];
+      nodeTopologyInfo = (NodeTopologyInfo) parameters[i++];
    }
 
    @Override

Modified: trunk/core/src/main/java/org/infinispan/config/Configuration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/Configuration.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/config/Configuration.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -22,6 +22,7 @@
 package org.infinispan.config;
 
 import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
 import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.eviction.EvictionThreadPolicy;
 import org.infinispan.factories.ComponentRegistry;
@@ -60,6 +61,7 @@
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  * @author Vladimir Blagojevic
  * @author Galder Zamarreño
+ * @author Mircea.Markus at jboss.com
  * @see <a href="../../../config.html#ce_infinispan_default">Configuration reference</a>
  * @since 4.0
  */
@@ -154,10 +156,7 @@
    //   SETTERS - MAKE SURE ALL SETTERS PERFORM testImmutability()!!!
    // ------------------------------------------------------------------------------------------------------------
 
-   /**
-    * will be removed, please use {@link org.infinispan.manager.EmbeddedCacheManager#getGlobalConfiguration()}
-    */
-   @Deprecated
+
    public GlobalConfiguration getGlobalConfiguration() {
       return globalConfiguration;
    }
@@ -395,8 +394,6 @@
    /**
     * Cache mode. For distribution, set mode to either 'd', 'dist' or 'distribution'. For replication, use either 'r',
     * 'repl' or 'replication'. Finally, for invalidation, 'i', 'inv' or 'invalidation'.
-    *
-    * @param cacheMode
     */
    public void setCacheMode(CacheMode cacheModeInt) {
       clustering.setMode(cacheModeInt);
@@ -943,6 +940,9 @@
    }
 
    public String getConsistentHashClass() {
+      if (clustering.hash.consistentHashClass == null) {
+         clustering.hash.consistentHashClass = globalConfiguration == null || globalConfiguration.hasTopologyInfo() ? TopologyAwareConsistentHash.class.getName() : DefaultConsistentHash.class.getName();
+      }
       return clustering.hash.consistentHashClass;
    }
 
@@ -1912,7 +1912,7 @@
       private static final long serialVersionUID = 752218766840948822L;
 
       @ConfigurationDocRef(name = "class", bean = Configuration.class, targetElement = "setConsistentHashClass")
-      protected String consistentHashClass = DefaultConsistentHash.class.getName();
+      protected String consistentHashClass;
 
       @ConfigurationDocRef(bean = Configuration.class, targetElement = "setNumOwners")
       protected Integer numOwners = 2;

Modified: trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/config/GlobalConfiguration.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -35,6 +35,7 @@
  *
  * @author Manik Surtani
  * @author Vladimir Blagojevic
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  * 
  * @see <a href="../../../config.html#ce_infinispan_global">Configuration reference</a>
@@ -163,6 +164,10 @@
       transport.setStrictPeerToPeer(strictPeerToPeer);
    }
 
+   public boolean hasTopologyInfo() {
+      return getSiteId() != null || getRackId() != null || getMachineId() != null;
+   }
+
    /**
     * Behavior of the JVM shutdown hook registered by the cache
     */
@@ -301,6 +306,52 @@
       transport.setClusterName(clusterName);
    }
 
+   /**
+    * The id of the machine where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+    * hinting</a> .
+    */
+   public void setMachineId(String machineId) {
+      transport.setMachineId(machineId);
+   }
+
+   /**
+    * @see #setMachineId(String)
+    */
+   public String getMachineId() {
+      return transport.getMachineId();
+   }
+
+   /**
+    * The id of the rack where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+    * hinting</a> .
+    */
+   public void setRackId(String rackId) {
+      transport.setRackId(rackId);
+   }
+
+   /**
+    * @see #setRackId(String)
+    */
+   public String getRackId() {
+      return transport.getRackId();
+   }
+
+   /**
+    * The id of the site where this node runs. Used for <a href="http://community.jboss.org/wiki/DesigningServerHinting">server
+    * hinting</a> .
+    */
+   public void setSiteId(String siteId) {
+      transport.setSiteId(siteId);
+   }
+
+   /**
+    * @see #setSiteId(String) 
+    */
+   public String getSiteId() {
+      return transport.getSiteId();
+   }
+
+
    public ShutdownHookBehavior getShutdownHookBehavior() {
       return shutdown.hookBehavior;
    }
@@ -661,7 +712,16 @@
      
       @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setClusterName")
       protected String clusterName = "Infinispan-Cluster";
-     
+
+      @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setMachineId")
+      protected String machineId;
+
+      @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setRackId")
+      protected String rackId;
+
+      @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setSiteId")
+      protected String siteId;
+
       @ConfigurationDocRef(bean=GlobalConfiguration.class,targetElement="setStrictPeerToPeer")
       protected Boolean strictPeerToPeer = true;      
       
@@ -699,6 +759,36 @@
       }
 
       @XmlAttribute
+      public void setMachineId(String machineId) {
+         testImmutability("machineId");
+         this.machineId = machineId;
+      }
+
+      @XmlAttribute
+      public void setRackId(String rackId) {
+         testImmutability("rackId");
+         this.rackId = rackId;
+      }
+
+      @XmlAttribute
+      public void setSiteId(String siteId) {
+         testImmutability("siteId");
+         this.siteId = siteId;
+      }
+
+      public String getMachineId() {
+         return machineId;
+      }
+
+      public String getRackId() {
+         return rackId;
+      }
+
+      public String getSiteId() {
+         return siteId;
+      }
+
+      @XmlAttribute
       public void setDistributedSyncTimeout(Long distributedSyncTimeout) {
          testImmutability("distributedSyncTimeout");
          this.distributedSyncTimeout = distributedSyncTimeout;

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManager.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -4,6 +4,7 @@
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.container.entries.InternalCacheValue;
 import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
 import org.infinispan.factories.scopes.Scope;
 import org.infinispan.factories.scopes.Scopes;
 import org.infinispan.loaders.CacheStore;
@@ -115,9 +116,9 @@
     *
     * @param joiner address of joiner
     * @param starting if true, the joiner is reporting that it is starting the join process.  If false, the joiner is
-    * reporting that it has completed the join process.
+    * @param nodeTopologyInfo
     */
-   void informRehashOnJoin(Address joiner, boolean starting);
+   NodeTopologyInfo informRehashOnJoin(Address joiner, boolean starting, NodeTopologyInfo nodeTopologyInfo);
 
    /**
     * Retrieves a cache store if one is available and set up for use in rehashing.  May return null!

Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -6,6 +6,7 @@
 import org.infinispan.commands.write.PutKeyValueCommand;
 import org.infinispan.commands.write.WriteCommand;
 import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.CacheEntry;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -18,6 +19,8 @@
 
 import org.infinispan.distribution.ch.ConsistentHash;
 import org.infinispan.distribution.ch.ConsistentHashHelper;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyInfo;
 import org.infinispan.distribution.ch.UnionConsistentHash;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
@@ -86,6 +89,8 @@
    private final ExecutorService rehashExecutor;
 
    private final TransactionLogger transactionLogger = new TransactionLoggerImpl();
+
+   TopologyInfo topologyInfo = new TopologyInfo();
    
    /**
     * Rehash flag set by a rehash task associated with this DistributionManager
@@ -155,6 +160,12 @@
       log.trace("Starting distribution manager on " + getMyAddress());            
       listener = new ViewChangeListener();
       notifier.addListener(listener);
+      GlobalConfiguration gc = configuration.getGlobalConfiguration();
+      if (gc.hasTopologyInfo()) {
+         Address address = rpcManager.getTransport().getAddress();
+         NodeTopologyInfo nti = new NodeTopologyInfo(gc.getMachineId(),gc.getRackId(), gc.getSiteId(), address);
+         topologyInfo.addNodeTopologyInfo(address, nti);
+      }
       join();
    }
    
@@ -187,7 +198,7 @@
       setJoinComplete(false);
       Transport t = rpcManager.getTransport();
       List<Address> members = t.getMembers();
-      consistentHash = createConsistentHash(configuration, members);
+      consistentHash = createConsistentHash(configuration, members, topologyInfo);
       self = t.getAddress();
       if (members.size() > 1 && !t.getCoordinator().equals(self)) {
          JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, dataContainer, this);
@@ -218,13 +229,15 @@
          Address leaver = MembershipArithmetic.getMemberLeft(oldMembers, newMembers);
          log.info("This is a LEAVE event!  Node {0} has just left", leaver);
 
+         topologyInfo.removeNodeInfo(leaver);
+
          try {
             if (!(consistentHash instanceof UnionConsistentHash)) {
               oldConsistentHash = consistentHash;
             }  else {
                oldConsistentHash = ((UnionConsistentHash) consistentHash).getNewCH();
             }
-            consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration);
+            consistentHash = ConsistentHashHelper.removeAddress(consistentHash, leaver, configuration, topologyInfo);
          } catch (Exception e) {
             log.fatal("Unable to process leaver!!", e);
             throw new CacheException(e);
@@ -346,7 +359,7 @@
       }
    }
 
-   public void informRehashOnJoin(Address a, boolean starting) {
+   public NodeTopologyInfo informRehashOnJoin(Address a, boolean starting, NodeTopologyInfo nodeTopologyInfo) {
       log.trace("Informed of a JOIN by {0}.  Starting? {1}", a, starting);
       if (!starting) {
          if (consistentHash instanceof UnionConsistentHash) {
@@ -356,18 +369,17 @@
          }
          joiner = null;            
       } else {
+         topologyInfo.addNodeTopologyInfo(a, nodeTopologyInfo);
+         log.trace("Node topology info added({0}).  Topology info is {1}", nodeTopologyInfo, topologyInfo);         
          ConsistentHash chOld = consistentHash;
          if (chOld instanceof UnionConsistentHash) throw new RuntimeException("Not expecting a union CH!");
          oldConsistentHash = chOld;
          joiner = a;        
-         ConsistentHash chNew;
-         chNew = (ConsistentHash) Util.getInstance(configuration.getConsistentHashClass());
-         List<Address> newAddresses = new LinkedList<Address>(chOld.getCaches());
-         newAddresses.add(a);
-         chNew.setCaches(newAddresses);
+         ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(configuration, chOld.getCaches(), topologyInfo, a);
          consistentHash = new UnionConsistentHash(chOld, chNew);
       }
       log.trace("New CH is {0}", consistentHash);
+      return topologyInfo.getNodeTopologyInfo(rpcManager.getAddress());
    }
 
    public void applyState(ConsistentHash consistentHash, Map<Object, InternalCacheValue> state) {
@@ -501,4 +513,8 @@
    public void setConfiguration(Configuration configuration) {
       this.configuration = configuration;
    }
+
+   public TopologyInfo getTopologyInfo() {
+      return topologyInfo;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/InvertedLeaveTask.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -72,7 +72,7 @@
       long start = trace ? System.currentTimeMillis() : 0;
 
       int replCount = configuration.getNumOwners();
-      ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled);
+      ConsistentHash oldCH = ConsistentHashHelper.createConsistentHash(configuration, dmi.getConsistentHash().getCaches(), leaversHandled, dmi.topologyInfo);
       ConsistentHash newCH = dmi.getConsistentHash();
       try {
          if (log.isDebugEnabled()) {

Modified: trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/JoinTask.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -10,6 +10,7 @@
 import static org.infinispan.distribution.ch.ConsistentHashHelper.createConsistentHash;
 
 import org.infinispan.distribution.ch.ConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
 import org.infinispan.remoting.responses.Response;
 import org.infinispan.remoting.responses.SuccessfulResponse;
 import static org.infinispan.remoting.rpc.ResponseMode.SYNCHRONOUS;
@@ -41,6 +42,7 @@
  * <ul>
  *
  * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class JoinTask extends RehashTask {
@@ -87,9 +89,10 @@
          // 2.  new CH instance
          if (chOld.getCaches().contains(self))
             chNew = chOld;
-         else
-            chNew = createConsistentHash(configuration, chOld.getCaches(), self);
-         
+         else {
+            chNew = createConsistentHash(configuration, chOld.getCaches(), dmi.topologyInfo, self);
+         }
+
          dmi.setConsistentHash(chNew);
          try {
             if (configuration.isRehashEnabled()) {
@@ -97,8 +100,11 @@
                transactionLogger.enable();
    
                // 4.  Broadcast new temp CH
-               rpcManager.broadcastRpcCommand(cf.buildRehashControlCommand(JOIN_REHASH_START, self), true, true);
-   
+               RehashControlCommand rehashControlCommand = cf.buildRehashControlCommand(JOIN_REHASH_START, self);
+               rehashControlCommand.setNodeTopologyInfo(dmi.topologyInfo.getNodeTopologyInfo(rpcManager.getAddress()));
+               List<Response> responseList = rpcManager.invokeRemotely(null, rehashControlCommand, true, true);
+               updateTopologyInfo(responseList);
+
                // 5.  txLogger being enabled will cause ClusteredGetCommands to return uncertain responses.
    
                // 6.  pull state from everyone.
@@ -147,7 +153,18 @@
       }
    }
 
-    private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
+   private void updateTopologyInfo(List<Response> responseList) {
+      for (Response r : responseList) {
+         SuccessfulResponse sr = (SuccessfulResponse) r;
+         NodeTopologyInfo nti = (NodeTopologyInfo) sr.getResponseValue();
+         if (nti != null) {
+            dmi.topologyInfo.addNodeTopologyInfo(nti.getAddress(), nti);
+         }
+      }
+      if (log.isTraceEnabled()) log.trace("Topology after after getting cluster info: " + dmi.topologyInfo);
+   }
+
+   private ConsistentHash retrieveOldCH(boolean trace) throws InterruptedException, IllegalAccessException,
                     InstantiationException, ClassNotFoundException {
         
         // this happens in a loop to ensure we receive the correct CH and not a "union".
@@ -184,7 +201,7 @@
                     log.trace("Sleeping for {0}", Util.prettyPrintTime(time));
                 Thread.sleep(time); // sleep for a while and retry
             } else {
-                result = createConsistentHash(configuration, addresses);
+                result = createConsistentHash(configuration, addresses, dmi.topologyInfo);
             }
         } while (result == null && System.currentTimeMillis() < giveupTime);
 

Modified: trunk/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/ch/AbstractConsistentHash.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -42,4 +42,8 @@
    public void setTopologyInfo(TopologyInfo topologyInfo) {
       this.topologyInfo = topologyInfo;
    }
+
+   public TopologyInfo getTopologyInfo() {
+      return topologyInfo;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/ch/ConsistentHashHelper.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -14,6 +14,7 @@
  * A helper class that handles the construction of consistent hash instances based on configuration.
  *
  * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public class ConsistentHashHelper {
@@ -24,16 +25,18 @@
     * @param ch       consistent hash to start with
     * @param toRemove address to remove
     * @param c        configuration
+    * @param topologyInfo
     * @return a new consistent hash instance of the same type
     */
-   public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c) {
+   public static ConsistentHash removeAddress(ConsistentHash ch, Address toRemove, Configuration c, TopologyInfo topologyInfo) {
       if (ch instanceof UnionConsistentHash)
-         return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c);
+         return removeAddressFromUnionConsistentHash((UnionConsistentHash) ch, toRemove, c, topologyInfo);
       else {
          ConsistentHash newCH = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
          List<Address> caches = new ArrayList<Address>(ch.getCaches());
          caches.remove(toRemove);
          newCH.setCaches(caches);
+         newCH.setTopologyInfo(topologyInfo);
          return newCH;
       }
    }
@@ -45,11 +48,12 @@
     * @param uch      union consistent hash instance
     * @param toRemove address to remove
     * @param c        configuration
+    * @param topologyInfo
     * @return a new UnionConsistentHash instance
     */
-   public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c) {
-      ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c);
-      ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c);
+   public static UnionConsistentHash removeAddressFromUnionConsistentHash(UnionConsistentHash uch, Address toRemove, Configuration c, TopologyInfo topologyInfo) {
+      ConsistentHash newFirstCH = removeAddress(uch.getOldConsistentHash(), toRemove, c, topologyInfo);
+      ConsistentHash newSecondCH = removeAddress(uch.getNewConsistentHash(), toRemove, c, topologyInfo);
       return new UnionConsistentHash(newFirstCH, newSecondCH);
    }
 
@@ -59,11 +63,13 @@
     *
     * @param c         configuration
     * @param addresses with which to populate the consistent hash
+    * @param topologyInfo
     * @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses) {
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, TopologyInfo topologyInfo) {
       ConsistentHash ch = (ConsistentHash) Util.getInstance(c.getConsistentHashClass());
       ch.setCaches(addresses);
+      ch.setTopologyInfo(topologyInfo);
       return ch;
    }
 
@@ -73,13 +79,13 @@
     *
     * @param c             configuration
     * @param addresses     with which to populate the consistent hash
-    * @param moreAddresses to add to the list of addresses
-    * @return a new consistent hash instance
+    * @param topologyInfo
+    *@param moreAddresses to add to the list of addresses  @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Address... moreAddresses) {
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, TopologyInfo topologyInfo, Address... moreAddresses) {
       List<Address> list = new LinkedList<Address>(addresses);
       list.addAll(Arrays.asList(moreAddresses));
-      return createConsistentHash(c, list);
+      return createConsistentHash(c, list, topologyInfo);
    }
 
    /**
@@ -89,12 +95,13 @@
     * @param c             configuration
     * @param addresses     with which to populate the consistent hash
     * @param moreAddresses to add to the list of addresses
+    * @param topologyInfo
     * @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses) {
+   public static ConsistentHash createConsistentHash(Configuration c, List<Address> addresses, Collection<Address> moreAddresses, TopologyInfo topologyInfo) {
       List<Address> list = new LinkedList<Address>(addresses);
       list.addAll(moreAddresses);
-      return createConsistentHash(c, list);
+      return createConsistentHash(c, list, topologyInfo);
    }
 
    /**
@@ -103,12 +110,14 @@
     *
     * @param clazz     type of the consistent hash to create
     * @param addresses with which to populate the consistent hash
+    * @param topologyInfo
     * @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses) {
+   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, TopologyInfo topologyInfo) {
       ConsistentHash ch;
       ch = Util.getInstance(clazz);
-      if (addresses != null && !addresses.isEmpty()) ch.setCaches(addresses);
+      if (addresses != null && !addresses.isEmpty())  ch.setCaches(addresses);
+      ch.setTopologyInfo(topologyInfo);
       return ch;
    }
 
@@ -118,13 +127,13 @@
     *
     * @param clazz         type of the consistent hash to create
     * @param addresses     with which to populate the consistent hash
-    * @param moreAddresses to add to the list of addresses
-    * @return a new consistent hash instance
+    * @param topologyInfo
+    *@param moreAddresses to add to the list of addresses  @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Address... moreAddresses) {
+   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, TopologyInfo topologyInfo, Address... moreAddresses) {
       List<Address> list = new LinkedList<Address>(addresses);
       list.addAll(Arrays.asList(moreAddresses));
-      return createConsistentHash(clazz, list);
+      return createConsistentHash(clazz, list, topologyInfo);
    }
 
    /**
@@ -134,11 +143,12 @@
     * @param clazz         type of the consistent hash to create
     * @param addresses     with which to populate the consistent hash
     * @param moreAddresses to add to the list of addresses
+    * @param topologyInfo
     * @return a new consistent hash instance
     */
-   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Collection<Address> moreAddresses) {
+   public static ConsistentHash createConsistentHash(Class<? extends ConsistentHash> clazz, List<Address> addresses, Collection<Address> moreAddresses, TopologyInfo topologyInfo) {
       List<Address> list = new LinkedList<Address>(addresses);
       list.addAll(moreAddresses);
-      return createConsistentHash(clazz, list);
+      return createConsistentHash(clazz, list, topologyInfo);
    }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/ch/NodeTopologyInfo.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -1,21 +1,33 @@
 package org.infinispan.distribution.ch;
 
+import org.infinispan.marshall.Externalizer;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
+import org.infinispan.remoting.transport.Address;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
 /**
  * Holds topology information about a a node.
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.2
  */
+ at Marshallable(externalizer = NodeTopologyInfo.NodeTopologyInfoExternalizer.class, id = Ids.NODE_TOPOLOGY_INFO)
 public class NodeTopologyInfo {
 
    private final String machineId;
    private final String rackId;
    private final String siteId;
+   private final Address address;
 
-   public NodeTopologyInfo(String machineId, String rackId, String siteId) {
+   public NodeTopologyInfo(String machineId, String rackId, String siteId, Address address) {
       this.machineId = machineId;
       this.rackId = rackId;
       this.siteId = siteId;
+      this.address = address;
    }
 
    public String getMachineId() {
@@ -45,4 +57,63 @@
    private boolean equalObjects(Object first, Object second) {
       return first == null ? second == null : first.equals(second);
    }
+
+   public Address getAddress() {
+      return address;
+   }
+
+   public static class NodeTopologyInfoExternalizer implements Externalizer {
+
+      @Override
+      public void writeObject(ObjectOutput output, Object object) throws IOException {
+         NodeTopologyInfo nti = (NodeTopologyInfo) object;
+         output.writeObject(nti.siteId);
+         output.writeObject(nti.rackId);
+         output.writeObject(nti.machineId);
+         output.writeObject(nti.address);
+      }
+
+      @Override
+      public Object readObject(ObjectInput input) throws IOException, ClassNotFoundException {
+         String siteId = (String) input.readObject();
+         String rackId = (String) input.readObject();
+         String machineId = (String) input.readObject();
+         Address address = (Address) input.readObject();
+         return new NodeTopologyInfo(machineId, rackId, siteId, address);
+      }
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      NodeTopologyInfo that = (NodeTopologyInfo) o;
+
+      if (address != null ? !address.equals(that.address) : that.address != null) return false;
+      if (machineId != null ? !machineId.equals(that.machineId) : that.machineId != null) return false;
+      if (rackId != null ? !rackId.equals(that.rackId) : that.rackId != null) return false;
+      if (siteId != null ? !siteId.equals(that.siteId) : that.siteId != null) return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      int result = machineId != null ? machineId.hashCode() : 0;
+      result = 31 * result + (rackId != null ? rackId.hashCode() : 0);
+      result = 31 * result + (siteId != null ? siteId.hashCode() : 0);
+      result = 31 * result + (address != null ? address.hashCode() : 0);
+      return result;
+   }
+
+   @Override
+   public String toString() {
+      return "NodeTopologyInfo{" +
+            "machineId='" + machineId + '\'' +
+            ", rackId='" + rackId + '\'' +
+            ", siteId='" + siteId + '\'' +
+            ", address=" + address +
+            '}';
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyAwareConsistentHash.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -1,13 +1,20 @@
 package org.infinispan.distribution.ch;
 
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
 import org.infinispan.remoting.transport.Address;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 
 import static java.lang.Math.min;
 
@@ -15,6 +22,7 @@
  * Consistent hash that is aware of cluster topology.
  * Design described here: http://community.jboss.org/wiki/DesigningServerHinting.
  * <p>
+ * <pre>
  * Algorithm:
  * - place nodes on the hash wheel based address's hash code
  * - For selecting owner nodes:
@@ -24,11 +32,12 @@
  *       - if not enough nodes found repeat walk again and pick nodes that have different site id, rack id and machine id
  *       - Ultimately cycle back to the first node selected, don't discard any nodes, regardless of machine id/rack
  * id/site id match.
-
+ * </pre>
  *
  * @author Mircea.Markus at jboss.com
  * @since 4.2
  */
+ at Marshallable(externalizer = TopologyAwareConsistentHash.Externalizer.class, id = Ids.TOPOLOGY_AWARE_CH)
 public class TopologyAwareConsistentHash extends AbstractWheelConsistentHash {
 
    public List<Address> locate(Object key, int replCount) {
@@ -119,4 +128,32 @@
       Integer ownerHash = positions.tailMap(hash).firstKey();
       return positions.get(ownerHash);
    }
+
+   public static class Externalizer implements org.infinispan.marshall.Externalizer {
+      @Override
+      public void writeObject(ObjectOutput output, Object subject) throws IOException {
+         TopologyAwareConsistentHash dch = (TopologyAwareConsistentHash) subject;
+         output.writeObject(dch.addresses);
+         output.writeObject(dch.positions);
+         output.writeObject(dch.addressToHashIds);
+         Collection<NodeTopologyInfo> infoCollection = dch.topologyInfo.getAllTopologyInfo();
+         output.writeInt(infoCollection.size());
+         for (NodeTopologyInfo nti : infoCollection) output.writeObject(nti);
+      }
+
+      @Override
+      public Object readObject(ObjectInput unmarshaller) throws IOException, ClassNotFoundException {
+         TopologyAwareConsistentHash ch = new TopologyAwareConsistentHash();
+         ch.addresses = (ArrayList<Address>) unmarshaller.readObject();
+         ch.positions = (SortedMap<Integer, Address>) unmarshaller.readObject();
+         ch.addressToHashIds = (Map<Address, Integer>) unmarshaller.readObject();
+         ch.topologyInfo = new TopologyInfo();
+         int ntiCount = unmarshaller.readInt();
+         for (int i = 0; i < ntiCount; i++) {
+            NodeTopologyInfo nti = (NodeTopologyInfo) unmarshaller.readObject();
+            ch.topologyInfo.addNodeTopologyInfo(nti.getAddress(), nti);
+         }
+         return ch;
+      }
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/distribution/ch/TopologyInfo.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -1,7 +1,15 @@
 package org.infinispan.distribution.ch;
 
+import org.infinispan.marshall.Externalizer;
+import org.infinispan.marshall.Ids;
+import org.infinispan.marshall.Marshallable;
 import org.infinispan.remoting.transport.Address;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -36,4 +44,41 @@
       NodeTopologyInfo info2 = address2TopologyInfo.get(a2);
       return info1.sameMachine(info2);
    }
+
+   public NodeTopologyInfo getNodeTopologyInfo(Address address) {
+      return address2TopologyInfo.get(address);
+   }
+
+   public void removeNodeInfo(Address leaver) {
+      address2TopologyInfo.remove(leaver);
+   }
+
+   public Collection<NodeTopologyInfo> getAllTopologyInfo() {
+      return Collections.unmodifiableCollection(address2TopologyInfo.values());
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      TopologyInfo that = (TopologyInfo) o;
+
+      if (address2TopologyInfo != null ? !address2TopologyInfo.equals(that.address2TopologyInfo) : that.address2TopologyInfo != null)
+         return false;
+
+      return true;
+   }
+
+   @Override
+   public int hashCode() {
+      return address2TopologyInfo != null ? address2TopologyInfo.hashCode() : 0;
+   }
+
+   @Override
+   public String toString() {
+      return "TopologyInfo{" +
+            "address2TopologyInfo=" + address2TopologyInfo +
+            '}';
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -117,4 +117,6 @@
    static final byte BYTE_ARRAY_KEY = 57;
    static final byte TOPOLOGY_ADDRESS = 58;
    static final byte TOPOLOGY_VIEW = 59;
+   static final byte NODE_TOPOLOGY_INFO = 60;
+   static final byte TOPOLOGY_AWARE_CH = 61;
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -54,6 +54,8 @@
 import org.infinispan.container.entries.TransientMortalCacheEntry;
 import org.infinispan.container.entries.TransientMortalCacheValue;
 import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
 import org.infinispan.distribution.ch.UnionConsistentHash;
 import org.infinispan.loaders.bucket.Bucket;
 import org.infinispan.marshall.Externalizer;
@@ -170,6 +172,8 @@
       MARSHALLABLES.add(ClearOperation.class.getName());
       MARSHALLABLES.add(DefaultConsistentHash.class.getName());
       MARSHALLABLES.add(UnionConsistentHash.class.getName());
+      MARSHALLABLES.add(NodeTopologyInfo.class.getName());
+      MARSHALLABLES.add(TopologyAwareConsistentHash.class.getName());
 
       MARSHALLABLES.add("org.infinispan.server.core.CacheValue");
       MARSHALLABLES.add("org.infinispan.server.memcached.MemcachedValue");

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -37,6 +37,7 @@
  * the registered {@link Transport}.
  *
  * @author Manik Surtani
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 public interface RpcManager {
@@ -156,7 +157,7 @@
     * @param usePriorityQueue if true, a priority queue is used
     * @throws ReplicationException in the event of problems
     */
-   void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
+   List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException;
 
    /**
     * The same as {@link #invokeRemotely(java.util.Collection, org.infinispan.commands.ReplicableCommand, boolean)}

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -44,6 +44,7 @@
  *
  * @author Manik Surtani
  * @author Galder Zamarreño
+ * @author Mircea.Markus at jboss.com
  * @since 4.0
  */
 @MBean(objectName = "RpcManager", description = "Manages all remote calls to remote cache instances in the cluster.")
@@ -212,23 +213,24 @@
       invokeRemotely(recipients, rpc, sync, false);
    }
 
-   public final void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
-      invokeRemotely(recipients, rpc, sync, usePriorityQueue, configuration.getSyncReplTimeout());
+   public final List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+      return invokeRemotely(recipients, rpc, sync, usePriorityQueue, configuration.getSyncReplTimeout());
    }
 
-   public final void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws ReplicationException {
+   public final List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue, long timeout) throws ReplicationException {
       if (trace) log.trace("{0} broadcasting call {1} to recipient list {2}", t.getAddress(), rpc, recipients);
 
       if (useReplicationQueue(sync)) {
          replicationQueue.add(rpc);
+         return null;
       } else {
          if (!(rpc instanceof CacheRpcCommand)) {
             rpc = cf.buildSingleRpcCommand(rpc);
          }
-         List rsps;
-         rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue);
+         List<Response> rsps = invokeRemotely(recipients, rpc, getResponseMode(sync), timeout, usePriorityQueue);
          if (trace) log.trace("responses=" + rsps);
          if (sync) checkResponses(rsps);
+         return rsps;
       }
    }
 

Modified: trunk/core/src/main/resources/config-samples/all.xml
===================================================================
--- trunk/core/src/main/resources/config-samples/all.xml	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/main/resources/config-samples/all.xml	2010-10-28 19:50:05 UTC (rev 2630)
@@ -44,7 +44,7 @@
          There is no added cost to defining a transport but not creating a cache that uses one, since the transport
          is created and initialized lazily.
       -->
-      <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno"/>
+      <transport clusterName="infinispan-cluster" machineId="m1" rackId="r1" siteId="s1" distributedSyncTimeout="50000" nodeName="Jalapeno"/>
          <!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
          <!-- See the JGroupsTransport javadocs for more flags -->
 

Modified: trunk/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/config/parsing/XmlFileParsingTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -8,6 +8,7 @@
 import org.infinispan.config.GlobalConfiguration.ShutdownHookBehavior;
 import org.infinispan.config.InfinispanConfiguration;
 import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
 import org.infinispan.eviction.EvictionStrategy;
 import org.infinispan.eviction.EvictionThreadPolicy;
 import org.infinispan.loaders.file.FileCacheStoreConfig;
@@ -234,7 +235,7 @@
       assert c.getCacheMode() == Configuration.CacheMode.DIST_SYNC;
       assert c.getL1Lifespan() == 600000;
       assert c.getRehashWaitTime() == 120000;
-      assert c.getConsistentHashClass().equals(DefaultConsistentHash.class.getName());
+      assert c.getConsistentHashClass().equals(TopologyAwareConsistentHash.class.getName());
       assert c.getNumOwners() == 3;
       assert c.isL1CacheEnabled();
 

Modified: trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/distribution/BaseDistFunctionalTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -11,6 +11,7 @@
 import org.infinispan.distribution.ch.ConsistentHash;
 import org.infinispan.distribution.ch.ConsistentHashHelper;
 import org.infinispan.distribution.ch.DefaultConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
 import org.infinispan.distribution.ch.UnionConsistentHash;
 import org.infinispan.manager.CacheContainer;
 import org.infinispan.manager.EmbeddedCacheManager;
@@ -90,7 +91,7 @@
 
    public static ConsistentHash createNewConsistentHash(List<Address> servers) {
       try {
-         return ConsistentHashHelper.createConsistentHash(DefaultConsistentHash.class, servers);
+         return ConsistentHashHelper.createConsistentHash(DefaultConsistentHash.class, servers, new TopologyInfo());
       } catch (RuntimeException re) {
          throw re;
       } catch (Exception e) {

Modified: trunk/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/distribution/TopologyAwareConsistentHashTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -509,7 +509,7 @@
 
    private void addNode(TestAddress address, String machineId, String rackId, String siteId) {
       addresses.add(address);
-      NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId);
+      NodeTopologyInfo nti = new NodeTopologyInfo(machineId, rackId, siteId, null);
       ti.addNodeTopologyInfo(address, nti);
    }
 

Copied: trunk/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java (from rev 2629, branches/4.2.x/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java)
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/distribution/TopologyInfoBroadcastTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -0,0 +1,83 @@
+package org.infinispan.distribution;
+
+import org.infinispan.config.Configuration;
+import org.infinispan.config.GlobalConfiguration;
+import org.infinispan.distribution.ch.NodeTopologyInfo;
+import org.infinispan.distribution.ch.TopologyAwareConsistentHash;
+import org.infinispan.distribution.ch.TopologyInfo;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * @author Mircea.Markus at jboss.com
+ * @since 4.2
+ */
+ at Test(groups = "functional", testName = "distribution.TopologyInfoBroadcastTest")
+public class TopologyInfoBroadcastTest extends MultipleCacheManagersTest {
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      addClusterEnabledCacheManagers(Configuration.CacheMode.DIST_SYNC, 3);
+      updatedSiteInfo(manager(0), "s0", "r0", "m0");
+      updatedSiteInfo(manager(1), "s1", "r1", "m1");
+      updatedSiteInfo(manager(2), "s2", "r2", "m2");
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(1), cache(2));
+   }
+
+   private void updatedSiteInfo(EmbeddedCacheManager embeddedCacheManager, String s, String r, String m) {
+      GlobalConfiguration gc = embeddedCacheManager.getGlobalConfiguration();
+      gc.setSiteId(s);
+      gc.setRackId(r);
+      gc.setMachineId(m);
+   }
+
+   public void testIsReplicated() {
+      assert advancedCache(0).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+      assert advancedCache(1).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+      assert advancedCache(2).getDistributionManager().getConsistentHash() instanceof TopologyAwareConsistentHash;
+
+      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(1).getDistributionManager();
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+      assertTopologyInfo3Nodes(dmi.getTopologyInfo());
+
+      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(1).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+   }
+
+   @Test(dependsOnMethods = "testIsReplicated")
+   public void testNodeLeaves() {
+      TestingUtil.killCacheManagers(manager(1));
+      BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(cache(0), cache(2));
+
+      DistributionManagerImpl dmi = (DistributionManagerImpl) advancedCache(0).getDistributionManager();
+      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+      dmi = (DistributionManagerImpl) advancedCache(2).getDistributionManager();
+      assertTopologyInfo2Nodes(dmi.getTopologyInfo());
+
+      TopologyAwareConsistentHash tach = (TopologyAwareConsistentHash) advancedCache(0).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+      tach = (TopologyAwareConsistentHash) advancedCache(2).getDistributionManager().getConsistentHash();
+      assertEquals(tach.getTopologyInfo(), dmi.getTopologyInfo());
+   }
+
+   private void assertTopologyInfo3Nodes(TopologyInfo topologyInfo) {
+      assertTopologyInfo2Nodes(topologyInfo);
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(1)), new NodeTopologyInfo("m1","r1", "s1", address(1)));
+   }
+
+   private void assertTopologyInfo2Nodes(TopologyInfo topologyInfo) {
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(0)), new NodeTopologyInfo("m0","r0", "s0", address(0)));
+      assertEquals(topologyInfo.getNodeTopologyInfo(address(2)), new NodeTopologyInfo("m2","r2", "s2", address(2)));
+   }
+}

Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/WorkDuringJoinTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -5,6 +5,7 @@
 import org.infinispan.distribution.MagicKey;
 import org.infinispan.distribution.ch.ConsistentHash;
 import org.infinispan.distribution.ch.ConsistentHashHelper;
+import org.infinispan.distribution.ch.TopologyInfo;
 import org.infinispan.manager.EmbeddedCacheManager;
 import org.infinispan.remoting.transport.Address;
 import org.testng.annotations.Test;
@@ -54,7 +55,7 @@
       List<MagicKey> keys = init();
       ConsistentHash chOld = getConsistentHash(c1);
       Address joinerAddress = startNewMember();
-      ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(chOld.getClass(), chOld.getCaches(), joinerAddress);
+      ConsistentHash chNew = ConsistentHashHelper.createConsistentHash(chOld.getClass(), chOld.getCaches(), new TopologyInfo(), joinerAddress);
       // which key should me mapped to the joiner?
       MagicKey keyToTest = null;
       for (MagicKey k: keys) {

Modified: trunk/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/manager/CacheManagerXmlConfigurationTest.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -15,6 +15,7 @@
 
 import static org.infinispan.test.TestingUtil.INFINISPAN_END_TAG;
 import static org.infinispan.test.TestingUtil.INFINISPAN_START_TAG;
+import static org.testng.Assert.assertEquals;
 
 /**
  * @author Manik Surtani
@@ -33,6 +34,10 @@
    public void testNamedCacheXML() throws IOException {
       cm = TestCacheManagerFactory.fromXml("configs/named-cache-test.xml");
 
+      assertEquals("s1", cm.getGlobalConfiguration().getSiteId());
+      assertEquals("r1", cm.getGlobalConfiguration().getRackId());
+      assertEquals("m1", cm.getGlobalConfiguration().getMachineId());
+
       // test default cache
       Cache c = cm.getCache();
       assert c.getConfiguration().getConcurrencyLevel() == 100;

Modified: trunk/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java	2010-10-28 19:50:05 UTC (rev 2630)
@@ -108,9 +108,10 @@
       realOne.invokeRemotely(recipients, rpc, sync);
    }
 
-   public void invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
+   public List<Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpc, boolean sync, boolean usePriorityQueue) throws ReplicationException {
       failIfNeeded();
       realOne.invokeRemotely(recipients, rpc, sync, usePriorityQueue);
+      return null;
    }
 
    public void invokeRemotelyInFuture(Collection<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future) {

Modified: trunk/core/src/test/resources/configs/named-cache-test.xml
===================================================================
--- trunk/core/src/test/resources/configs/named-cache-test.xml	2010-10-28 19:40:21 UTC (rev 2629)
+++ trunk/core/src/test/resources/configs/named-cache-test.xml	2010-10-28 19:50:05 UTC (rev 2630)
@@ -1,8 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <infinispan
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-      xsi:schemaLocation="urn:infinispan:config:4.1 http://www.infinispan.org/schemas/infinispan-config-4.1.xsd"
-      xmlns="urn:infinispan:config:4.1">
+      xsi:schemaLocation="urn:infinispan:config:4.2 http://www.infinispan.org/schemas/infinispan-config-4.2.xsd"
+      xmlns="urn:infinispan:config:4.2">
 
    <global>
 
@@ -32,7 +32,7 @@
          </properties>
       </replicationQueueScheduledExecutor>
 
-      <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno">
+      <transport clusterName="infinispan-cluster" distributedSyncTimeout="50000" nodeName="Jalapeno" machineId="m1" rackId="r1" siteId="s1">
          <!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
          <properties>
          <property name="configurationFile" value="config-samples/jgroups-udp.xml"/>



More information about the infinispan-commits mailing list