[infinispan-commits] Infinispan SVN: r1733 - in trunk: server/core/src/main/scala/org/infinispan/server/core/transport/netty and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon May 3 05:20:24 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-05-03 05:20:23 -0400 (Mon, 03 May 2010)
New Revision: 1733

Modified:
   trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java
   trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
Log:
[ISPN-384] (Implement topology and hash distribution headers in Hot Rod) Implemented hash distribution headers and added methods to ConsistentHash interface to enable clients to retrieve an address' hash id and the hash space.

Modified: trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/ConsistentHash.java	2010-05-03 09:20:23 UTC (rev 1733)
@@ -81,4 +81,20 @@
     * @return true if the key is mapped to the address; false otherwise
     */
    boolean isKeyLocalToAddress(Address a, Object key, int replCount);
+
+   /**
+    * Returns the value between 0 and the hash space limit, or hash id, for a particular address. If there's no such
+    * value for an address, this method will return -1.
+    *
+    * @return An int between 0 and hash space if the address is present in the hash wheel, otherwise it returns -1.
+    */
+   int getHashId(Address a);
+
+   /**
+    * Returns the hash space constant for this consistent hash algorithm class. This integer is often used as modulus
+    * for arithmetic operations within the algorithm, for example, limiting the range of possible hash values.
+    * 
+    * @return A positive integer containing the hash space constant or 0 is not supported by implementation. 
+    */
+   int getHashSpace();
 }

Modified: trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/DefaultConsistentHash.java	2010-05-03 09:20:23 UTC (rev 1733)
@@ -3,12 +3,16 @@
 import org.infinispan.marshall.Ids;
 import org.infinispan.marshall.Marshallable;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -20,6 +24,8 @@
    // make sure all threads see the current list
    ArrayList<Address> addresses;
    SortedMap<Integer, Address> positions;
+   // TODO: Maybe address and addressToHashIds can be combined in a LinkedHashMap?
+   Map<Address, Integer> addressToHashIds;
 
    final static int HASH_SPACE = 10240; // no more than 10k nodes?
 
@@ -43,12 +49,17 @@
       addresses.trimToSize();
 
       positions = new TreeMap<Integer, Address>();
+      addressToHashIds = new HashMap<Address, Integer>();
 
       for (Address a : addresses) {
          int positionIndex = Math.abs(hash(a.hashCode())) % HASH_SPACE;
          // this is deterministic since the address list is ordered and the order is consistent across the grid
          while (positions.containsKey(positionIndex)) positionIndex = positionIndex + 1 % HASH_SPACE;
          positions.put(positionIndex, a);
+         // If address appears several times, take the lowest value to guarantee that
+         // at least the initial value and subsequent +1 values would end up in the same node
+         if (!addressToHashIds.containsKey(a))
+            addressToHashIds.put(a, positionIndex);
       }
 
       addresses.clear();
@@ -153,6 +164,20 @@
    }
 
    @Override
+   public int getHashId(Address a) {
+      Integer hashId = addressToHashIds.get(a);
+      if (hashId == null)
+         return -1;
+      else
+         return hashId.intValue();
+   }
+
+   @Override
+   public int getHashSpace() {
+      return HASH_SPACE;
+   }
+
+   @Override
    public boolean equals(Object o) {
       if (this == o) return true;
       if (o == null || getClass() != o.getClass()) return false;

Modified: trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/ExperimentalDefaultConsistentHash.java	2010-05-03 09:20:23 UTC (rev 1733)
@@ -262,6 +262,16 @@
       return hash;
    }
 
+   @Override
+   public int getHashId(Address a) {
+      throw new RuntimeException("Not yet implemented");
+   }
+
+   @Override
+   public int getHashSpace() {
+      return Integer.MAX_VALUE; // Entire positive integer range
+   }
+
    /**
     * @return A String representing the object pool.
     */

Modified: trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/core/src/main/java/org/infinispan/distribution/UnionConsistentHash.java	2010-05-03 09:20:23 UTC (rev 1733)
@@ -56,6 +56,19 @@
       throw new UnsupportedOperationException("Unsupported!");
    }
 
+   @Override
+   public int getHashId(Address a) {
+      throw new UnsupportedOperationException("Unsupported!");
+   }
+
+   @Override
+   public int getHashSpace() {
+      int oldHashSpace = oldCH.getHashSpace();
+      int newHashSpace = newCH.getHashSpace();
+      // In a union, the hash space is the biggest of the hash spaces.
+      return oldHashSpace > newHashSpace ? oldHashSpace : newHashSpace;
+   }
+
    public ConsistentHash getNewConsistentHash() {
       return newCH;
    }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -56,8 +56,15 @@
       ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer {
          override def determineThreadName(currentThreadName: String, proposedThreadName: String): String = {
             val index = proposedThreadName.findIndexOf(_ == '#')
-            val typeInFix = if (proposedThreadName.contains("boss")) "Master-" else "Worker-"
-            threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
+            val typeInFix =
+               if (proposedThreadName contains "server worker") "ServerWorker-"
+               else if (proposedThreadName contains "server boss") "ServerMaster-"
+               else if (proposedThreadName contains "client worker") "ClientWorker-"
+               else "ClientMaster-"
+            val name = threadNamePrefix + typeInFix + proposedThreadName.substring(index + 1, proposedThreadName.length)
+            trace("Thread name will be {0}, with current thread name being {1} and proposed name being '{2}'",
+               name, currentThread, proposedThreadName)
+            name
          }
       })
       val bootstrap = new ServerBootstrap(factory);

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -4,6 +4,7 @@
 import java.util.Arrays
 import org.infinispan.marshall.Marshallable
 import java.io.{ObjectInput, ObjectOutput}
+import org.infinispan.server.core.Logging
 
 /**
  * // TODO: Document this
@@ -23,7 +24,9 @@
       }
    }
 
-   override def hashCode: Int = 41 + Arrays.hashCode(data)
+   override def hashCode: Int = {
+      41 + Arrays.hashCode(data)
+   }
 
    override def toString = {
       new StringBuilder().append("CacheKey").append("{")
@@ -33,8 +36,8 @@
 
 }
 
-object CacheKey {
-   class Externalizer extends org.infinispan.marshall.Externalizer {      
+object CacheKey extends Logging {
+   class Externalizer extends org.infinispan.marshall.Externalizer {
       override def writeObject(output: ObjectOutput, obj: AnyRef) {
          val cacheKey = obj.asInstanceOf[CacheKey]
          output.write(cacheKey.data.length)

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -12,6 +12,7 @@
 import collection.immutable
 import org.infinispan.util.concurrent.TimeoutException
 import java.io.IOException
+import org.infinispan.distribution.DefaultConsistentHash
 
 /**
  * // TODO: Document this
@@ -27,6 +28,10 @@
    import HotRodServer._
    type SuitableHeader = HotRodHeader
 
+   private lazy val isClustered: Boolean = cacheManager.getGlobalConfiguration.getTransportClass != null
+   private lazy val topologyCache: Cache[String, TopologyView] =
+      if (isClustered) cacheManager.getCache(TopologyCacheName) else null
+
    override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
       val streamOp = buffer.readUnsignedByte
       val op = toRequest(streamOp)
@@ -82,25 +87,27 @@
    private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
       val topologyResponse = getTopologyResponse(h)
       if (h.flag == ForceReturnPreviousValue)
-         new ResponseWithPrevious(h.messageId, op, st, topologyResponse, if (prev == null) None else Some(prev.data))
+         new ResponseWithPrevious(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse,
+            if (prev == null) None else Some(prev.data))
       else
-         new Response(h.messageId, op, st, topologyResponse)
+         new Response(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse)
    }
 
    override def createGetResponse(h: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef = {
       val topologyResponse = getTopologyResponse(h)
       if (v != null && op == GetRequest)
-         new GetResponse(h.messageId, GetResponse, Success, topologyResponse, Some(v.data))
+         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, Success, topologyResponse, Some(v.data))
       else if (v != null && op == GetWithVersionRequest)
-         new GetWithVersionResponse(h.messageId, GetWithVersionResponse, Success, topologyResponse, Some(v.data), v.version)
+         new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, Success,
+            topologyResponse, Some(v.data), v.version)
       else if (op == GetRequest)
-         new GetResponse(h.messageId, GetResponse, KeyDoesNotExist, topologyResponse, None)
+         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, KeyDoesNotExist, topologyResponse, None)
       else
-         new GetWithVersionResponse(h.messageId, GetWithVersionResponse, KeyDoesNotExist, topologyResponse, None, 0)
+         new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, KeyDoesNotExist,
+            topologyResponse, None, 0)
    }
 
    override def handleCustomRequest(h: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
-      val messageId = h.messageId
       h.op match {
          case RemoveIfUnmodifiedRequest => {
             val k = readKey(buffer)
@@ -124,18 +131,18 @@
             val topologyResponse = getTopologyResponse(h)
             val k = readKey(buffer)
             if (cache.containsKey(k))
-               new Response(messageId, ContainsKeyResponse, Success, topologyResponse)
+               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, Success, topologyResponse)
             else
-               new Response(messageId, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
+               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
          }
          case ClearRequest => {
             val topologyResponse = getTopologyResponse(h)
             cache.clear
-            new Response(messageId, ClearResponse, Success, topologyResponse)
+            new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, topologyResponse)
          }
          case PingRequest => {
             val topologyResponse = getTopologyResponse(h)
-            new Response(messageId, PingResponse, Success, topologyResponse) 
+            new Response(h.messageId, h.cacheName, h.clientIntel, PingResponse, Success, topologyResponse)
          }
       }
    }
@@ -152,33 +159,36 @@
       stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
       stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
       val topologyResponse = getTopologyResponse(h)
-      new StatsResponse(h.messageId, immutable.Map[String, String]() ++ stats, topologyResponse)
+      new StatsResponse(h.messageId, h.cacheName, h.clientIntel, immutable.Map[String, String]() ++ stats, topologyResponse)
    }
 
    override def createErrorResponse(h: HotRodHeader, t: Throwable): AnyRef = {
       t match {
          case i: IOException =>
-            new ErrorResponse(h.messageId, ParseError, getTopologyResponse(h), i.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ParseError, getTopologyResponse(h), i.toString)
          case t: TimeoutException =>
-            new ErrorResponse(h.messageId, OperationTimedOut, getTopologyResponse(h), t.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, OperationTimedOut, getTopologyResponse(h), t.toString)
          case t: Throwable =>
-            new ErrorResponse(h.messageId, ServerError, getTopologyResponse(h), t.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ServerError, getTopologyResponse(h), t.toString)
       }
    }
 
    private def getTopologyResponse(h: HotRodHeader): Option[AbstractTopologyResponse] = {
       // If clustered, set up a cache for topology information
-      if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
-         val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
-         h.clientIntelligence match {
+      if (isClustered) {
+         h.clientIntel match {
             case 2 | 3 => {
                val currentTopologyView = topologyCache.get("view")
                if (h.topologyId != currentTopologyView.topologyId) {
-                  if (h.clientIntelligence == 2) {
+                  val cache = cacheManager.getCache(h.cacheName)
+                  val config = cache.getConfiguration
+                  if (h.clientIntel == 2 || !config.getCacheMode.isDistributed) {
                      Some(TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members)))
-                  } else { // Must be 3
-                     // TODO: Implement hash-distribution-aware reply
-                     None
+                  } else { // Must be 3 and distributed
+                     // TODO: Retrieve hash function when we have specified functions
+                     val hashSpace = cache.getAdvancedCache.getDistributionManager.getConsistentHash.getHashSpace
+                     Some(HashDistAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members),
+                           config.getNumOwners, 1, hashSpace))
                   }
                } else None
             }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -108,14 +108,14 @@
          case se: ServerException => {
             val h = se.header.asInstanceOf[HotRodHeader]
             se.getCause match {
-               case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, None, i.toString)
-               case u: UnknownOperationException => new ErrorResponse(h.messageId, UnknownOperation, None, u.toString)
-               case u: UnknownVersionException => new ErrorResponse(h.messageId, UnknownVersion, None, u.toString)
+               case i: InvalidMagicIdException => new ErrorResponse(0, "", 1, InvalidMagicOrMsgId, None, i.toString)
+               case u: UnknownOperationException => new ErrorResponse(h.messageId, "", 1, UnknownOperation, None, u.toString)
+               case u: UnknownVersionException => new ErrorResponse(h.messageId, "", 1, UnknownVersion, None, u.toString)
                case t: Throwable => h.decoder.createErrorResponse(h, t)
             }
          }
          case c: ClosedChannelException => null
-         case t: Throwable => new ErrorResponse(0, ServerError, None, t.toString)
+         case t: Throwable => new ErrorResponse(0, "", 1, ServerError, None, t.toString)
       }
    }
 
@@ -131,7 +131,7 @@
 class InvalidMagicIdException(reason: String) extends StreamCorruptedException(reason)
 
 class HotRodHeader(override val op: Enumeration#Value, val messageId: Long, val cacheName: String,
-                   val flag: ProtocolFlag, val clientIntelligence: Short, val topologyId: Int,
+                   val flag: ProtocolFlag, val clientIntel: Short, val topologyId: Int,
                    val decoder: AbstractVersionedDecoder) extends RequestHeader(op) {
    override def toString = {
       new StringBuilder().append("HotRodHeader").append("{")
@@ -139,7 +139,7 @@
          .append(", messageId=").append(messageId)
          .append(", cacheName=").append(cacheName)
          .append(", flag=").append(flag)
-         .append(", clientIntelligence=").append(clientIntelligence)
+         .append(", clientIntelligence=").append(clientIntel)
          .append(", topologyId=").append(topologyId)
          .append("}").toString
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -4,6 +4,9 @@
 import org.infinispan.server.core.transport.{ChannelBuffer, ChannelHandlerContext, Channel, Encoder}
 import OperationStatus._
 import org.infinispan.server.core.transport.ChannelBuffers._
+import org.infinispan.manager.CacheManager
+import org.infinispan.Cache
+import collection.mutable.ListBuffer
 
 /**
  * // TODO: Document this
@@ -11,8 +14,10 @@
  * @since
  */
 
-class HotRodEncoder extends Encoder {
+class HotRodEncoder(cacheManager: CacheManager) extends Encoder {
    import HotRodEncoder._
+   import HotRodServer._
+   private lazy val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)   
 
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
       trace("Encode msg {0}", msg)
@@ -56,23 +61,76 @@
          buffer.writeByte(1) // Topology changed
          r.topologyResponse.get match {
             case t: TopologyAwareResponse => {
-               buffer.writeUnsignedInt(t.view.topologyId)
-               buffer.writeUnsignedInt(t.view.members.size)
-               t.view.members.foreach{address =>
-                  buffer.writeString(address.host)
-                  buffer.writeUnsignedShort(address.port)
-               }
+               if (r.clientIntel == 2)
+                  writeTopologyHeader(t, buffer)
+               else
+                  writeHashTopologyHeader(t, buffer)
             }
-            case h: HashDistAwareResponse => {
-               // TODO: Implement reply to hash dist responses
-            }
+            case h: HashDistAwareResponse => writeHashTopologyHeader(h, buffer, r)
          }
       } else {
          buffer.writeByte(0) // No topology change
       }
       buffer
    }
-   
+
+   private def writeTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
+      buffer.writeUnsignedInt(t.view.topologyId)
+      buffer.writeUnsignedInt(t.view.members.size)
+      t.view.members.foreach{address =>
+         buffer.writeString(address.host)
+         buffer.writeUnsignedShort(address.port)
+      }
+   }
+
+   // TODO: Spec values when client intel is 3 but cache is not configured with distribution
+   private def writeHashTopologyHeader(t: TopologyAwareResponse, buffer: ChannelBuffer) {
+      buffer.writeUnsignedInt(t.view.topologyId)
+      buffer.writeUnsignedShort(0) // Num key owners
+      buffer.writeByte(0) // Hash function
+      buffer.writeUnsignedInt(0) // Hash space
+      buffer.writeUnsignedInt(t.view.members.size)
+      t.view.members.foreach{address =>
+         buffer.writeString(address.host)
+         buffer.writeUnsignedShort(address.port)
+         buffer.writeUnsignedInt(0) // Address' hash id
+      }
+   }
+
+   private def writeHashTopologyHeader(h: HashDistAwareResponse, buffer: ChannelBuffer, r: Response) {
+      buffer.writeUnsignedInt(h.view.topologyId)
+      buffer.writeUnsignedShort(h.numOwners) // Num key owners
+      buffer.writeByte(h.hashFunction) // Hash function
+      buffer.writeUnsignedInt(h.hashSpace) // Hash space
+      buffer.writeUnsignedInt(h.view.members.size)
+      var hashIdUpdateRequired = false
+      // If we reached here, we know for sure that this is a cache configured with distribution
+      val consistentHash = cacheManager.getCache(r.cacheName).getAdvancedCache.getDistributionManager.getConsistentHash
+      val updateMembers = new ListBuffer[TopologyAddress]
+      h.view.members.foreach{address =>
+         buffer.writeString(address.host)
+         buffer.writeUnsignedShort(address.port)
+         val cachedHashId = address.hashIds.get(r.cacheName)
+         val hashId = consistentHash.getHashId(address.clusterAddress)
+         val newAddress =
+             // If distinct or not present, cached hash id needs updating
+             if (cachedHashId == None || cachedHashId.get != hashId) {
+                if (!hashIdUpdateRequired) hashIdUpdateRequired = true
+                val newHashIds = address.hashIds + (r.cacheName -> hashId)
+                address.copy(hashIds = newHashIds)
+             } else {
+                address
+             }
+         updateMembers += newAddress
+         buffer.writeUnsignedInt(hashId) // Address' hash id
+      }
+      // At least a hash id had to be updated in the view. Take the view copy and distribute it around the cluster
+      if (hashIdUpdateRequired) {
+         val viewCopy = h.view.copy(members = updateMembers.toList)
+         topologyCache.replace("view", h.view, viewCopy)
+      }
+   }
+
 }
 
 object HotRodEncoder extends Logging {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -20,7 +20,7 @@
  * @since 4.1
  */
 
-class HotRodServer extends AbstractProtocolServer("HotRod") {
+class HotRodServer extends AbstractProtocolServer("HotRod") with Logging {
    import HotRodServer._
    private var isClustered: Boolean = _
    private var address: TopologyAddress = _
@@ -28,7 +28,7 @@
 
    def getAddress: TopologyAddress = address
 
-   override def getEncoder: Encoder = new HotRodEncoder
+   override def getEncoder: Encoder = new HotRodEncoder(getCacheManager)
 
    override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
 
@@ -43,10 +43,10 @@
    private def addSelfToTopologyView(host: String, port: Int, cacheManager: CacheManager) {
       defineTopologyCacheConfig(cacheManager)
       topologyCache = cacheManager.getCache(TopologyCacheName)
-      address = TopologyAddress(host, port, 0, cacheManager.getAddress)
+      address = TopologyAddress(host, port, Map.empty, cacheManager.getAddress)
+      debug("Local topology address is {0}", address)
       cacheManager.addListener(new CrashedMemberDetectorListener)
       val currentView = topologyCache.get("view")
-      // TODO: If distribution configured, add hashcode of this address
       if (currentView != null) {
          val newMembers = currentView.members ::: List(address)
          val newView = TopologyView(currentView.topologyId + 1, newMembers)
@@ -77,14 +77,18 @@
    protected def removeSelfFromTopologyView {
       // Graceful shutdown, remove this node as member and install new view
       val currentView = topologyCache.get("view")
-      // TODO: If distribution configured, add hashcode of this address
-      val newMembers = currentView.members.filterNot(_ == address)
-      val newView = TopologyView(currentView.topologyId + 1, newMembers)
-      val replaced = topologyCache.replace("view", currentView, newView)
-      if (!replaced) {
-         // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+      // Comparing cluster address should be enough. Full object comparison could fail if hash id map has changed.
+      val newMembers = currentView.members.filterNot(_.clusterAddress == address.clusterAddress)
+      if (newMembers.length != (currentView.members.length - 1)) {
+         debug("Cluster member {0} was not filtered out of the current view {1}", address, currentView)
       } else {
-         debug("Removed {0} from topology view, new view is {1}", address, newView)
+         val newView = TopologyView(currentView.topologyId + 1, newMembers)
+         val replaced = topologyCache.replace("view", currentView, newView)
+         if (!replaced) {
+            // TODO: There was a concurrent view modification. Just give up, logic to deal with crashed/stalled members will deal with this
+         } else {
+            debug("Removed {0} from topology view, new view is {1}", address, newView)
+         }
       }
    }
 
@@ -97,8 +101,7 @@
    }
 
    @Listener
-   class CrashedMemberDetectorListener {
-      import HotRodServer._
+   private class CrashedMemberDetectorListener {
 
       private val executor = Executors.newCachedThreadPool(new ThreadFactory(){
          val threadCounter = new AtomicInteger
@@ -117,48 +120,12 @@
          // This is to avoid all nodes trying to make the same modification which would be wasteful and lead to deadlocks.
          if (cacheManager.isCoordinator) {
             // Use a separate thread to avoid blocking the view handler thread
-            val callable = new Callable[Void] {
-               override def call = {
-                  try {
-                     val newMembers = e.getNewMembers
-                     val oldMembers = e.getOldMembers
-                     // Someone left the cluster, verify whether it did it gracefully or crashed.
-                     if (oldMembers.size > newMembers.size) {
-                        val newMembersList = asBuffer(newMembers).toList
-                        val oldMembersList = asBuffer(oldMembers).toList
-                        val goneMembers = oldMembersList -- newMembersList
-                        val currentView = topologyCache.get("view")
-                        var tmpMembers = currentView.members
-                        for (goneMember <- goneMembers) {
-                           trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
-                           // If old memmber is in topology, it means that it had an abnormal ending
-                           val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
-                           if (isCrashed) {
-                              trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
-                                    "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
-                              tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
-                              trace("After removal, new Hot Rod topology is {0}", tmpMembers)
-                           }
-                        }
-                        if (tmpMembers.size < currentView.members.size) {
-                           val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
-                           val replaced = topologyCache.replace("view", currentView, newView)
-                           if (!replaced) {
-                              // TODO: How to deal with concurrent failures at this point?
-                           }
-                        }
-                     }
-                  } catch {
-                     case t: Throwable => error("Error detecting crashed member", t)
-                  }
-                  null
-               }
-            }
-            executor.submit(callable);
+            executor.submit(new CrashedMemberDetectorCallable(e));
          }
       }
 
       private def isOldMemberInTopology(oldMember: Address, view: TopologyView): (Boolean, TopologyAddress) = {
+         // TODO: If members was stored as a map, this would be more efficient
          for (member <- view.members) {
             if (member.clusterAddress == oldMember) {
                return (true, member)
@@ -166,11 +133,51 @@
          }
          (false, null)
       }
+
+      private class CrashedMemberDetectorCallable(e: ViewChangedEvent) extends Callable[Void] {
+         override def call = {
+            try {
+               val newMembers = e.getNewMembers
+               val oldMembers = e.getOldMembers
+               // Someone left the cluster, verify whether it did it gracefully or crashed.
+               if (oldMembers.size > newMembers.size) {
+                  val newMembersList = asBuffer(newMembers).toList
+                  val oldMembersList = asBuffer(oldMembers).toList
+                  val goneMembers = oldMembersList.filterNot(newMembersList contains)
+                  val currentView = topologyCache.get("view")
+                  if (currentView != null) {
+                     var tmpMembers = currentView.members
+                     for (goneMember <- goneMembers) {
+                        trace("Old member {0} is not in new view {1}, did it crash?", goneMember, newMembers)
+                        // If old memmber is in topology, it means that it had an abnormal ending
+                        val (isCrashed, crashedTopologyMember) = isOldMemberInTopology(goneMember, currentView)
+                        if (isCrashed) {
+                           trace("Old member {0} with topology address {1} is still present in Hot Rod topology " +
+                                 "{2}, so must have crashed.", goneMember, crashedTopologyMember, currentView)
+                           tmpMembers = tmpMembers.filterNot(_ == crashedTopologyMember)
+                           trace("After removal, new Hot Rod topology is {0}", tmpMembers)
+                        }
+                     }
+                     if (tmpMembers.size < currentView.members.size) {
+                        val newView = TopologyView(currentView.topologyId + 1, tmpMembers)
+                        val replaced = topologyCache.replace("view", currentView, newView)
+                        if (!replaced) {
+                           // TODO: How to deal with concurrent failures at this point?
+                        }
+                     }
+                  }
+               }
+            } catch {
+               case t: Throwable => error("Error detecting crashed member", t)
+            }
+            null
+         }
+      }
    }
 
 }
 
-object HotRodServer extends Logging {
+object HotRodServer {
    val TopologyCacheName = "___hotRodTopologyCache"
 }
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -9,9 +9,8 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-// TODO: Maybe add clientIntelligence to response to decide what information to send back
-class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus,
-               val topologyResponse: Option[AbstractTopologyResponse]) {
+class Response(val messageId: Long, val cacheName: String, val clientIntel: Short, val operation: OperationResponse,
+               val status: OperationStatus, val topologyResponse: Option[AbstractTopologyResponse]) {
    override def toString = {
       new StringBuilder().append("Response").append("{")
          .append("messageId=").append(messageId)
@@ -21,11 +20,12 @@
    }
 }
 
-class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
+class ResponseWithPrevious(override val messageId: Long, override val cacheName: String,
+                           override val clientIntel: Short, override val operation: OperationResponse,
                            override val status: OperationStatus,
                            override val topologyResponse: Option[AbstractTopologyResponse],
                            val previous: Option[Array[Byte]])
-      extends Response(messageId, operation, status, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("ResponseWithPrevious").append("{")
          .append("messageId=").append(messageId)
@@ -36,10 +36,11 @@
    }
 }
 
-class GetResponse(override val messageId: Long, override val operation: OperationResponse,
-                  override val status: OperationStatus, override val topologyResponse: Option[AbstractTopologyResponse],
+class GetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
+                  override val operation: OperationResponse, override val status: OperationStatus,
+                  override val topologyResponse: Option[AbstractTopologyResponse],
                   val data: Option[Array[Byte]])
-      extends Response(messageId, operation, status, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("GetResponse").append("{")
          .append("messageId=").append(messageId)
@@ -50,11 +51,12 @@
    }
 }
 
-class GetWithVersionResponse(override val messageId: Long, override val operation: OperationResponse,
+class GetWithVersionResponse(override val messageId: Long, override val cacheName: String,
+                             override val clientIntel: Short, override val operation: OperationResponse,
                              override val status: OperationStatus,
                              override val topologyResponse: Option[AbstractTopologyResponse],
                              override val data: Option[Array[Byte]], val version: Long)
-      extends GetResponse(messageId, operation, status, topologyResponse, data) {
+      extends GetResponse(messageId, cacheName, clientIntel, operation, status, topologyResponse, data) {
    override def toString = {
       new StringBuilder().append("GetWithVersionResponse").append("{")
          .append("messageId=").append(messageId)
@@ -66,9 +68,10 @@
    }
 }
 
-class ErrorResponse(override val messageId: Long, override val status: OperationStatus,
+class ErrorResponse(override val messageId: Long, override val cacheName: String,
+                    override val clientIntel: Short, override val status: OperationStatus,
                     override val topologyResponse: Option[AbstractTopologyResponse], val msg: String)
-      extends Response(messageId, ErrorResponse, status, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, ErrorResponse, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("ErrorResponse").append("{")
          .append("messageId=").append(messageId)
@@ -79,9 +82,10 @@
    }
 }
 
-class StatsResponse(override val messageId: Long, val stats: Map[String, String],
+class StatsResponse(override val messageId: Long, override val cacheName: String,
+                    override val clientIntel: Short, val stats: Map[String, String],
                     override val topologyResponse: Option[AbstractTopologyResponse])
-      extends Response(messageId, StatsResponse, Success, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, StatsResponse, Success, topologyResponse) {
    override def toString = {
       new StringBuilder().append("StatsResponse").append("{")
          .append("messageId=").append(messageId)
@@ -95,5 +99,5 @@
 case class TopologyAwareResponse(override val view: TopologyView)
       extends AbstractTopologyResponse(view)
 
-case class HashDistAwareResponse(override val view: TopologyView, numKeyOwners: Short, hashFunction: Byte, hashSpaceSize: Int)
+case class HashDistAwareResponse(override val view: TopologyView, numOwners: Int, hashFunction: Byte, hashSpace: Int)
       extends AbstractTopologyResponse(view)
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -5,12 +5,16 @@
 import org.infinispan.remoting.transport.Address
 
 /**
- * // TODO: Document this
+ * A Hot Rod topology address represents a Hot Rod endpoint that belongs to a Hot Rod cluster. It contains host/port
+ * information where the Hot Rod endpoint is listening. To be able to detect crashed members in the cluster and update
+ * the Hot Rod topology accordingly, it also contains the corresponding cluster address. Finally, since each cache
+ * could potentially be configured with a different hash algorithm, a topology address also contains per cache hash id.
+ * 
  * @author Galder Zamarreño
  * @since // TODO
  */
 @Marshallable(externalizer = classOf[TopologyAddress.Externalizer], id = 58)
-case class TopologyAddress(val host: String, val port: Int, val hostHashCode: Int, val clusterAddress: Address)
+case class TopologyAddress(val host: String, val port: Int, val hashIds: Map[String, Int], val clusterAddress: Address)
 
 object TopologyAddress {
    class Externalizer extends org.infinispan.marshall.Externalizer {
@@ -18,16 +22,16 @@
          val topologyAddress = obj.asInstanceOf[TopologyAddress]
          output.writeObject(topologyAddress.host)
          output.writeInt(topologyAddress.port)
-         output.writeInt(topologyAddress.hostHashCode)
+         output.writeObject(topologyAddress.hashIds)
          output.writeObject(topologyAddress.clusterAddress)
       }
 
       override def readObject(input: ObjectInput): AnyRef = {
          val host = input.readObject.asInstanceOf[String]
          val port = input.readInt
-         val hostHashCode = input.readInt
+         val hashIds = input.readObject.asInstanceOf[Map[String, Int]]
          val clusterAddress = input.readObject.asInstanceOf[Address]
-         TopologyAddress(host, port, hostHashCode, clusterAddress)
+         TopologyAddress(host, port, hashIds, clusterAddress)
       }
    }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -11,7 +11,10 @@
 @Marshallable(externalizer = classOf[TopologyView.Externalizer], id = 59)
 case class TopologyView(val topologyId: Int, val members: List[TopologyAddress])
 // TODO: TopologyView could maintain a Map[Address, TopologyAddress] rather than pushing Address into each TopologyAddress.
-// TODO: That would make crash detection more efficient at the expense of some extra space. 
+// TODO: That would make crash detection more efficient at the expense of some extra space.
+// TODO: In fact, it might increase more concurrency and make replication more efficient if topology cache stored stuff
+// TODO: in [Address, TopologyAddress] and either keep the topology id as an entry in that same cache or in a separate one.
+// TODO: The downside here is that you'd need to make multiple cache calls atomic via txs or similar.
 
 object TopologyView {
    class Externalizer extends org.infinispan.marshall.Externalizer {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -2,13 +2,11 @@
 
 import org.infinispan.config.Configuration
 import java.lang.reflect.Method
-import test.HotRodClient
 import test.HotRodTestingUtil._
 import org.infinispan.server.hotrod.OperationStatus._
-import org.infinispan.config.Configuration.CacheMode
 import org.testng.Assert._
-import org.testng.annotations.{AfterMethod, AfterClass, Test}
-import org.infinispan.test.{TestingUtil, MultipleCacheManagersTest}
+import org.testng.annotations.Test
+import org.infinispan.test.TestingUtil
 
 /**
  * // TODO: Document this
@@ -17,57 +15,18 @@
  */
 
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodReplicationTest")
-class HotRodReplicationTest extends MultipleCacheManagersTest {
+class HotRodReplicationTest extends HotRodMultiNodeTest {
 
    import HotRodServer._
 
-   private val cacheName = "hotRodReplSync"
-   private[this] var servers: List[HotRodServer] = List()
-   private[this] var clients: List[HotRodClient] = List()
+   override protected def cacheName: String = "hotRodReplSync"
 
-   @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
-   override def createCacheManagers {
-      for (i <- 0 until 2) {
-         val cm = addClusterEnabledCacheManager()
-         cm.defineConfiguration(cacheName, createCacheConfig)
-         cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
-      }
-      servers = servers ::: List(startHotRodServer(cacheManagers.get(0))) 
-      servers = servers ::: List(startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50))
-      servers.foreach {s =>
-         clients = new HotRodClient("127.0.0.1", s.getPort, cacheName, 60) :: clients
-      }
-   }
-
-   @AfterClass(alwaysRun = true)
-   override def destroy {
-      log.debug("Test finished, close Hot Rod server", null)
-      clients.foreach(_.stop)
-      servers.foreach(_.stop)
-      super.destroy // Stop the caches last so that at stoppage time topology cache can be updated properly
-   }
-
-   @AfterMethod(alwaysRun=true)
-   override def clearContent() {
-      // Do not clear cache between methods so that topology cache does not get cleared
-   }
-
-   private def createCacheConfig: Configuration = {
+   override protected def createCacheConfig: Configuration = {
       val config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
       config.setFetchInMemoryState(true)
       config
    }
 
-   private def createTopologyCacheConfig: Configuration = {
-      val topologyCacheConfig = new Configuration
-      topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
-      topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
-      topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
-      topologyCacheConfig.setSyncCommitPhase(true) // Only for testing, so that asserts work fine.
-      topologyCacheConfig.setSyncRollbackPhase(true) // Only for testing, so that asserts work fine.
-      topologyCacheConfig
-   }
-
    def testReplicatedPut(m: Method) {
       val putSt = clients.head.put(k(m) , 0, 0, v(m)).status
       assertStatus(putSt, Success)
@@ -106,22 +65,15 @@
       assertEquals(resp.topologyResponse, None)
       resp = clients.head.ping(2, 0)
       assertStatus(resp.status, Success)
-      assertTopologyReceived(resp.topologyResponse.get)
+      assertTopologyReceived(resp.topologyResponse.get, servers)
       resp = clients.tail.head.ping(2, 1)
       assertStatus(resp.status, Success)
-      assertTopologyReceived(resp.topologyResponse.get)
+      assertTopologyReceived(resp.topologyResponse.get, servers)
       resp = clients.tail.head.ping(2, 2)
       assertStatus(resp.status, Success)
       assertEquals(resp.topologyResponse, None)
    }
 
-   private def assertTopologyReceived(topologyResp: AbstractTopologyResponse) {
-      assertEquals(topologyResp.view.topologyId, 2)
-      assertEquals(topologyResp.view.members.size, 2)
-      assertAddressEquals(topologyResp.view.members.head, servers.head.getAddress)
-      assertAddressEquals(topologyResp.view.members.tail.head, servers.tail.head.getAddress)
-   }
-
    def testReplicatedPutWithTopologyChanges(m: Method) {
       var resp = clients.head.put(k(m) , 0, 0, v(m), 1, 0)
       assertStatus(resp.status, Success)
@@ -129,10 +81,10 @@
       assertSuccess(clients.tail.head.get(k(m), 0), v(m))
       resp = clients.head.put(k(m) , 0, 0, v(m, "v1-"), 2, 0)
       assertStatus(resp.status, Success)
-      assertTopologyReceived(resp.topologyResponse.get)
+      assertTopologyReceived(resp.topologyResponse.get, servers)
       resp = clients.tail.head.put(k(m) , 0, 0, v(m, "v2-"), 2, 1)
       assertStatus(resp.status, Success)
-      assertTopologyReceived(resp.topologyResponse.get)
+      assertTopologyReceived(resp.topologyResponse.get, servers)
       resp = clients.head.put(k(m) , 0, 0, v(m, "v3-"), 2, 2)
       assertStatus(resp.status, Success)
       assertEquals(resp.topologyResponse, None)
@@ -142,21 +94,21 @@
       cm.defineConfiguration(cacheName, createCacheConfig)
       cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
       val newServer = startHotRodServer(cm, servers.tail.head.getPort + 25)
-      servers = servers ::: List(newServer)
 
-      resp = clients.head.put(k(m) , 0, 0, v(m, "v4-"), 2, 2)
-      assertStatus(resp.status, Success)
-      assertEquals(resp.topologyResponse.get.view.topologyId, 3)
-      assertEquals(resp.topologyResponse.get.view.members.size, 3)
-      assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
-      assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
-      assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, servers.tail.tail.head.getAddress)
-      assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v4-"))
+      try {
+         val resp = clients.head.put(k(m) , 0, 0, v(m, "v4-"), 2, 2)
+         assertStatus(resp.status, Success)
+         assertEquals(resp.topologyResponse.get.view.topologyId, 3)
+         assertEquals(resp.topologyResponse.get.view.members.size, 3)
+         assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
+         assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
+         assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, newServer.getAddress)
+         assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v4-"))
+      } finally {
+         newServer.stop
+         cm.stop
+      }
 
-      servers.tail.tail.head.stop
-      servers = servers.filterNot(_ == newServer)
-      cm.stop
-
       resp = clients.head.put(k(m) , 0, 0, v(m, "v5-"), 2, 3)
       assertStatus(resp.status, Success)
       assertEquals(resp.topologyResponse.get.view.topologyId, 4)
@@ -169,20 +121,21 @@
       cm.defineConfiguration(cacheName, createCacheConfig)
       cm.defineConfiguration(TopologyCacheName, createTopologyCacheConfig)
       val crashingServer = startCrashingHotRodServer(cm, servers.tail.head.getPort + 11)
-      servers = servers ::: List(crashingServer)
 
-      resp = clients.head.put(k(m) , 0, 0, v(m, "v6-"), 2, 4)
-      assertStatus(resp.status, Success)
-      assertEquals(resp.topologyResponse.get.view.topologyId, 5)
-      assertEquals(resp.topologyResponse.get.view.members.size, 3)
-      assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
-      assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
-      assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, servers.tail.tail.head.getAddress)
-      assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v6-"))
+      try {
+         val resp = clients.head.put(k(m) , 0, 0, v(m, "v6-"), 2, 4)
+         assertStatus(resp.status, Success)
+         assertEquals(resp.topologyResponse.get.view.topologyId, 5)
+         assertEquals(resp.topologyResponse.get.view.members.size, 3)
+         assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
+         assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
+         assertAddressEquals(resp.topologyResponse.get.view.members.tail.tail.head, crashingServer.getAddress)
+         assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v6-"))
+      } finally {
+         crashingServer.stop
+         cm.stop
+      }
 
-      crashingServer.stop
-      servers = servers.filterNot(_ == crashingServer)
-      cm.stop
       TestingUtil.blockUntilViewsReceived(10000, true, manager(0), manager(1))
 
       resp = clients.head.put(k(m) , 0, 0, v(m, "v7-"), 2, 5)
@@ -192,11 +145,18 @@
       assertAddressEquals(resp.topologyResponse.get.view.members.head, servers.head.getAddress)
       assertAddressEquals(resp.topologyResponse.get.view.members.tail.head, servers.tail.head.getAddress)
       assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v7-"))
-   }
 
-   private def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress) {
-      assertEquals(actual.host, expected.host)
-      assertEquals(actual.port, expected.port)
-      assertEquals(actual.hostHashCode, expected.hostHashCode)
+      resp = clients.head.put(k(m) , 0, 0, v(m, "v8-"), 3, 1)
+      assertStatus(resp.status, Success)
+      val hashTopologyResp = resp.topologyResponse.get.asInstanceOf[HashDistAwareResponse]
+      assertEquals(hashTopologyResp.view.topologyId, 6)
+      assertEquals(hashTopologyResp.view.members.size, 2)
+      assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, Map(cacheName -> 0))
+      assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, Map(cacheName -> 0))
+      assertEquals(hashTopologyResp.numOwners, 0)
+      assertEquals(hashTopologyResp.hashFunction, 0)
+      assertEquals(hashTopologyResp.hashSpace, 0)
+      assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v8-"))
    }
+
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -22,6 +22,7 @@
 import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.concurrent.atomic.{AtomicLong}
 import org.infinispan.test.TestingUtil
+import org.infinispan.util.Util
 
 /**
  * A very simply Hot Rod client for testing purpouses
@@ -229,7 +230,7 @@
             buffer.writeByte(op.code) // opcode
             buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
             buffer.writeUnsignedInt(op.flags) // flags
-            buffer.writeByte(op.clientIntelligence) // client intelligence
+            buffer.writeByte(op.clientIntel) // client intelligence
             buffer.writeUnsignedInt(op.topologyId) // topology id
             if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
                buffer.writeRangedBytes(op.key) // key length + key
@@ -271,17 +272,28 @@
       val topologyChangeResponse =
          if (topologyChangeMarker == 1) {
             val topologyId = buf.readUnsignedInt
-            if (op.clientIntelligence == 2) {
+            if (op.clientIntel == 2) {
                val numberClusterMembers = buf.readUnsignedInt
                val viewArray = new Array[TopologyAddress](numberClusterMembers)
                for (i <- 0 until numberClusterMembers) {
                   val host = buf.readString
                   val port = buf.readUnsignedShort
-                  viewArray(i) = TopologyAddress(host, port, 0, null)
+                  viewArray(i) = TopologyAddress(host, port, Map.empty, null)
                }
                Some(TopologyAwareResponse(TopologyView(topologyId, viewArray.toList)))
-            } else if (op.clientIntelligence == 3) {
-               None // TODO: Parse hash distribution aware
+            } else if (op.clientIntel == 3) {
+               val numOwners = buf.readUnsignedShort
+               val hashFunction = buf.readByte
+               val hashSpace = buf.readUnsignedInt
+               val numberClusterMembers = buf.readUnsignedInt
+               val viewArray = new Array[TopologyAddress](numberClusterMembers)
+               for (i <- 0 until numberClusterMembers) {
+                  val host = buf.readString
+                  val port = buf.readUnsignedShort
+                  val hashId = buf.readUnsignedInt
+                  viewArray(i) = TopologyAddress(host, port, Map(op.cacheName -> hashId), null)
+               }
+               Some(HashDistAwareResponse(TopologyView(topologyId, viewArray.toList), numOwners, hashFunction, hashSpace))
             } else {
                None // Is it possible?
             }
@@ -295,40 +307,52 @@
             for (i <- 1 to size) {
                stats += (buf.readString -> buf.readString)
             }
-            new StatsResponse(id, immutable.Map[String, String]() ++ stats, topologyChangeResponse)
+            new StatsResponse(id, op.cacheName, op.clientIntel, immutable.Map[String, String]() ++ stats, 
+               topologyChangeResponse)
          }
          case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
               | RemoveResponse | RemoveIfUnmodifiedResponse => {
             if (op.flags == 1) {
                val length = buf.readUnsignedInt
                if (length == 0) {
-                  new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, None)
+                  new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+                     topologyChangeResponse, None)
                } else {
                   val previous = new Array[Byte](length)
                   buf.readBytes(previous)
-                  new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, Some(previous))
+                  new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+                     topologyChangeResponse, Some(previous))
                }
-            } else new Response(id, opCode, status, topologyChangeResponse)
+            } else new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
          }
-         case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status, topologyChangeResponse)
+         case ContainsKeyResponse | ClearResponse | PingResponse =>
+            new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
          case GetWithVersionResponse  => {
             if (status == Success) {
                val version = buf.readLong
                val data = Some(buf.readRangedBytes)
-               new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, data, version)
+               new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+                  topologyChangeResponse, data, version)
             } else{
-               new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, None, 0)
+               new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+                  topologyChangeResponse, None, 0)
             }
          }
          case GetResponse => {
             if (status == Success) {
                val data = Some(buf.readRangedBytes)
-               new GetResponse(id, opCode, status, topologyChangeResponse, data)
+               new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, data)
             } else{
-               new GetResponse(id, opCode, status, topologyChangeResponse, None)
+               new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, None)
             }
          }
-         case ErrorResponse => new ErrorResponse(id, status, topologyChangeResponse, buf.readString)
+         case ErrorResponse => {
+            if (op == null)
+               new ErrorResponse(id, "", 0, status, topologyChangeResponse, buf.readString)
+            else
+               new ErrorResponse(id, op.cacheName, op.clientIntel, status, topologyChangeResponse, buf.readString)
+         }
+
       }
       trace("Got response from server: {0}", resp)
       resp
@@ -366,18 +390,35 @@
 
 }
 
-case class Op(val magic: Int,
-                 val code: Byte,
-                 val cacheName: String,
-                 val key: Array[Byte],
-                 val lifespan: Int,
-                 val maxIdle: Int,
-                 val value: Array[Byte],
-                 val flags: Int,
-                 val version: Long,
-                 val clientIntelligence: Byte,
-                 val topologyId: Int) {
+class Op(val magic: Int,
+         val code: Byte,
+         val cacheName: String,
+         val key: Array[Byte],
+         val lifespan: Int,
+         val maxIdle: Int,
+         val value: Array[Byte],
+         val flags: Int,
+         val version: Long,
+         val clientIntel: Byte,
+         val topologyId: Int) {
    lazy val id = HotRodClient.idCounter.incrementAndGet
+   override def toString = {
+      new StringBuilder().append("Op").append("(")
+         .append(id).append(',')
+         .append(magic).append(',')
+         .append(code).append(',')
+         .append(cacheName).append(',')
+         .append(if (key == null) "null" else Util.printArray(key, true)).append(',')
+         .append(maxIdle).append(',')
+         .append(lifespan).append(',')
+         .append(if (value == null) "null" else Util.printArray(value, true)).append(',')
+         .append(flags).append(',')
+         .append(version).append(',')
+         .append(clientIntel).append(',')
+         .append(topologyId).append(')')
+         .toString
+   }
+
 }
 
 class PartialOp(override val magic: Int,
@@ -389,14 +430,14 @@
                 override val value: Array[Byte],
                 override val flags: Int,
                 override val version: Long,
-                override val clientIntelligence: Byte,
+                override val clientIntel: Byte,
                 override val topologyId: Int)
-      extends Op(magic, code, cacheName, key, lifespan, maxIdle, value, flags, version, clientIntelligence, topologyId) {
+      extends Op(magic, code, cacheName, key, lifespan, maxIdle, value, flags, version, clientIntel, topologyId) {
 }
 
 class StatsOp(override val magic: Int,
               override val code: Byte,
               override val cacheName: String,
-              override val clientIntelligence: Byte,
+              override val clientIntel: Byte,
               override val topologyId: Int,
-              val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntelligence, topologyId)
\ No newline at end of file
+              val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-04-29 16:34:12 UTC (rev 1732)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-05-03 09:20:23 UTC (rev 1733)
@@ -8,7 +8,7 @@
 import org.infinispan.server.hotrod.OperationStatus._
 import org.testng.Assert._
 import org.infinispan.util.Util
-import org.infinispan.server.hotrod.{ResponseWithPrevious, GetWithVersionResponse, GetResponse, HotRodServer}
+import org.infinispan.server.hotrod._
 
 /**
  * // TODO: Document this
@@ -72,9 +72,10 @@
 
    def assertSuccess(resp: GetResponse, expected: Array[Byte]): Boolean = {
       assertStatus(resp.status, Success)
-      val isSuccess = Arrays.equals(expected, resp.data.get)
-      assertTrue(isSuccess)
-      isSuccess
+      val isArrayEquals = Arrays.equals(expected, resp.data.get)
+      assertTrue(isArrayEquals, "Retrieved data should have contained " + Util.printArray(expected, true)
+            + " (" + new String(expected) + "), but instead we received " + Util.printArray(resp.data.get, true) + " (" +  new String(resp.data.get) +")")
+      isArrayEquals
    }
 
    def assertSuccess(resp: GetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
@@ -96,6 +97,46 @@
       status == KeyDoesNotExist
    }
 
+   def assertTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer]) {
+      assertEquals(topoResp.view.topologyId, 2)
+      assertEquals(topoResp.view.members.size, 2)
+      assertAddressEquals(topoResp.view.members.head, servers.head.getAddress)
+      assertAddressEquals(topoResp.view.members.tail.head, servers.tail.head.getAddress)
+   }
+
+   def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress) {
+      assertEquals(actual.host, expected.host)
+      assertEquals(actual.port, expected.port)
+   }
+
+   def assertHashTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer], hashIds: List[Map[String, Int]]) {
+      val hashTopologyResp = topoResp.asInstanceOf[HashDistAwareResponse]
+      assertEquals(hashTopologyResp.view.topologyId, 2)
+      assertEquals(hashTopologyResp.view.members.size, 2)
+      assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, hashIds.head)
+      assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, hashIds.tail.head)
+      assertEquals(hashTopologyResp.numOwners, 2)
+      assertEquals(hashTopologyResp.hashFunction, 1)
+      assertEquals(hashTopologyResp.hashSpace, 10240)
+   }
+
+   def assertNoHashTopologyReceived(topoResp: AbstractTopologyResponse, servers: List[HotRodServer], hashIds: List[Map[String, Int]]) {
+      val hashTopologyResp = topoResp.asInstanceOf[HashDistAwareResponse]
+      assertEquals(hashTopologyResp.view.topologyId, 2)
+      assertEquals(hashTopologyResp.view.members.size, 2)
+      assertAddressEquals(hashTopologyResp.view.members.head, servers.head.getAddress, hashIds.head)
+      assertAddressEquals(hashTopologyResp.view.members.tail.head, servers.tail.head.getAddress, hashIds.tail.head)
+      assertEquals(hashTopologyResp.numOwners, 0)
+      assertEquals(hashTopologyResp.hashFunction, 0)
+      assertEquals(hashTopologyResp.hashSpace, 0)
+   }
+
+   def assertAddressEquals(actual: TopologyAddress, expected: TopologyAddress, expectedHashIds: Map[String, Int]) {
+      assertEquals(actual.host, expected.host)
+      assertEquals(actual.port, expected.port)
+      assertEquals(actual.hashIds, expectedHashIds)
+   }
+   
 } 
 
 object UniquePortThreadLocal extends ThreadLocal[Int] {



More information about the infinispan-commits mailing list