[infinispan-commits] Infinispan SVN: r1693 - in trunk: core/src/main/java/org/infinispan/marshall/jboss and 7 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Apr 15 10:48:27 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-04-15 10:48:24 -0400 (Thu, 15 Apr 2010)
New Revision: 1693

Added:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
Modified:
   trunk/core/src/main/java/org/infinispan/marshall/Ids.java
   trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
Log:
[ISPN-384] (Implement topology and hash distribution headers in Hot Rod) Topology-aware headers now returned upon detecting view id discrepancies between client and server.

Modified: trunk/core/src/main/java/org/infinispan/marshall/Ids.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/core/src/main/java/org/infinispan/marshall/Ids.java	2010-04-15 14:48:24 UTC (rev 1693)
@@ -115,4 +115,7 @@
    static final byte SERVER_CACHE_VALUE = 55;
    static final byte MEMCACHED_CACHE_VALUE = 56;
    static final byte HOTROD_CACHE_KEY = 57;
+   static final byte TOPOLOGY_ADDRESS = 58;
+   static final byte TOPOLOGY_VIEW = 59;
+
 }

Modified: trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/core/src/main/java/org/infinispan/marshall/jboss/ConstantObjectTable.java	2010-04-15 14:48:24 UTC (rev 1693)
@@ -172,6 +172,8 @@
       MARSHALLABLES.add("org.infinispan.server.core.CacheValue");
       MARSHALLABLES.add("org.infinispan.server.memcached.MemcachedValue");
       MARSHALLABLES.add("org.infinispan.server.hotrod.CacheKey");
+      MARSHALLABLES.add("org.infinispan.server.hotrod.TopologyAddress");
+      MARSHALLABLES.add("org.infinispan.server.hotrod.TopologyView");
    }
 
    /**

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/CacheValue.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -1,8 +1,8 @@
 package org.infinispan.server.core
 
 import org.infinispan.util.Util
-import java.io.{Serializable, ObjectOutput, ObjectInput, Externalizable}
-import org.infinispan.marshall.{Externalizer, Ids, Marshallable}
+import java.io.{ObjectOutput, ObjectInput}
+import org.infinispan.marshall.Marshallable
 
 /**
  * // TODO: Document this
@@ -10,7 +10,7 @@
  * @since 4.1
  */
 // TODO: putting Ids.SERVER_CACHE_VALUE fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[CacheValueExternalizer], id = 55)
+ at Marshallable(externalizer = classOf[CacheValue.Externalizer], id = 55)
 class CacheValue(val data: Array[Byte], val version: Long) {
 
    override def toString = {
@@ -22,18 +22,20 @@
 
 }
 
-private class CacheValueExternalizer extends Externalizer {
-   override def writeObject(output: ObjectOutput, obj: AnyRef) {
-      val cacheValue = obj.asInstanceOf[CacheValue]
-      output.write(cacheValue.data.length)
-      output.write(cacheValue.data)
-      output.writeLong(cacheValue.version)
-   }
+object CacheValue {
+   class Externalizer extends org.infinispan.marshall.Externalizer {
+      override def writeObject(output: ObjectOutput, obj: AnyRef) {
+         val cacheValue = obj.asInstanceOf[CacheValue]
+         output.write(cacheValue.data.length)
+         output.write(cacheValue.data)
+         output.writeLong(cacheValue.version)
+      }
 
-   override def readObject(input: ObjectInput): AnyRef = {
-      val data = new Array[Byte](input.read())
-      input.readFully(data)
-      val version = input.readLong
-      new CacheValue(data, version)
+      override def readObject(input: ObjectInput): AnyRef = {
+         val data = new Array[Byte](input.read())
+         input.readFully(data)
+         val version = input.readLong
+         new CacheValue(data, version)
+      }
    }
 }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,6 +16,7 @@
  */
 object VersionGenerator {
 
+   // TODO: Possibly seed version counter on capped System.currentTimeMillis, to avoid issues with clients holding to versions in between restarts
    private val versionCounter = new AtomicInteger
 
    private val versionPrefix = new AtomicLong

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -11,6 +11,7 @@
    def readUnsignedByte: Short
    def readUnsignedInt: Int
    def readUnsignedLong: Long
+   def readUnsignedShort: Int
    def readBytes(length: Int): ChannelBuffer
    def readerIndex: Int
    def readBytes(dst: Array[Byte]): Unit
@@ -31,6 +32,7 @@
    def writeRangedBytes(src: Array[Byte])
    def writeUnsignedInt(i: Int)
    def writeUnsignedLong(l: Long)
+   def writeUnsignedShort(i: Int)
    def writerIndex: Int
 
    /**

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,6 +16,7 @@
    override def readUnsignedByte: Short = buffer.readUnsignedByte
    override def readUnsignedInt: Int = VInt.read(this)
    override def readUnsignedLong: Long = VLong.read(this)
+   override def readUnsignedShort: Int = buffer.readUnsignedShort
    override def readBytes(length: Int): ChannelBuffer = new ChannelBufferAdapter(buffer.readBytes(length))
    override def readerIndex: Int = readerIndex
    override def readBytes(dst: Array[Byte]) = buffer.readBytes(dst) 
@@ -43,6 +44,7 @@
    }
    override def writeUnsignedInt(i: Int) = VInt.write(this, i)
    override def writeUnsignedLong(l: Long) = VLong.write(this, l)
+   override def writeUnsignedShort(i: Int) = buffer.writeShort(i)
    override def writerIndex: Int = buffer.writerIndex
 
    /**

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -29,10 +29,12 @@
 
    def createNotExistResponse(header: HotRodHeader): AnyRef
 
-   def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef
+   def createGetResponse(header: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef
 
    def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef
 
    def createStatsResponse(header: HotRodHeader, stats: Stats): AnyRef
 
+   def createErrorResponse(header: HotRodHeader, t: Throwable): AnyRef
+
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CacheKey.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -2,9 +2,8 @@
 
 import org.infinispan.util.Util
 import java.util.Arrays
-import org.infinispan.marshall.{Externalizer, Marshallable}
+import org.infinispan.marshall.Marshallable
 import java.io.{ObjectInput, ObjectOutput}
-import org.infinispan.server.core.Logging
 
 /**
  * // TODO: Document this
@@ -12,7 +11,7 @@
  * @since
  */
 // TODO: putting Ids.HOTROD_CACHE_KEY fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[CacheKeyExternalizer], id = 57)
+ at Marshallable(externalizer = classOf[CacheKey.Externalizer], id = 57)
 final class CacheKey(val data: Array[Byte]) {
 
    override def equals(obj: Any) = {
@@ -34,16 +33,18 @@
 
 }
 
-private class CacheKeyExternalizer extends Externalizer {
-   override def writeObject(output: ObjectOutput, obj: AnyRef) {
-      val cacheKey = obj.asInstanceOf[CacheKey]
-      output.write(cacheKey.data.length)
-      output.write(cacheKey.data)
-   }
+object CacheKey {
+   class Externalizer extends org.infinispan.marshall.Externalizer {      
+      override def writeObject(output: ObjectOutput, obj: AnyRef) {
+         val cacheKey = obj.asInstanceOf[CacheKey]
+         output.write(cacheKey.data.length)
+         output.write(cacheKey.data)
+      }
 
-   override def readObject(input: ObjectInput): AnyRef = {
-      val data = new Array[Byte](input.read())
-      input.readFully(data)
-      new CacheKey(data)
+      override def readObject(input: ObjectInput): AnyRef = {
+         val data = new Array[Byte](input.read())
+         input.readFully(data)
+         new CacheKey(data)
+      }
    }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -10,6 +10,8 @@
 import org.infinispan.server.core._
 import collection.mutable
 import collection.immutable
+import org.infinispan.util.concurrent.TimeoutException
+import java.io.IOException
 
 /**
  * // TODO: Document this
@@ -22,6 +24,7 @@
    import ResponseResolver._
    import OperationResponse._
    import ProtocolFlag._
+   import HotRodServer._
    type SuitableHeader = HotRodHeader
 
    override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
@@ -77,64 +80,67 @@
       createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
 
    private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
+      val topologyResponse = getTopologyResponse(h)
       if (h.flag == ForceReturnPreviousValue)
-         new ResponseWithPrevious(h.messageId, op, st, if (prev == null) None else Some(prev.data))
+         new ResponseWithPrevious(h.messageId, op, st, topologyResponse, if (prev == null) None else Some(prev.data))
       else
-         new Response(h.messageId, op, st)
+         new Response(h.messageId, op, st, topologyResponse)
    }
 
-   override def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
+   override def createGetResponse(h: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef = {
+      val topologyResponse = getTopologyResponse(h)
       if (v != null && op == GetRequest)
-         new GetResponse(messageId, GetResponse, Success, Some(v.data))
+         new GetResponse(h.messageId, GetResponse, Success, topologyResponse, Some(v.data))
       else if (v != null && op == GetWithVersionRequest)
-         new GetWithVersionResponse(messageId, GetWithVersionResponse, Success, Some(v.data), v.version)
+         new GetWithVersionResponse(h.messageId, GetWithVersionResponse, Success, topologyResponse, Some(v.data), v.version)
       else if (op == GetRequest)
-         new GetResponse(messageId, GetResponse, KeyDoesNotExist, None)
+         new GetResponse(h.messageId, GetResponse, KeyDoesNotExist, topologyResponse, None)
       else
-         new GetWithVersionResponse(messageId, GetWithVersionResponse, KeyDoesNotExist, None, 0)
+         new GetWithVersionResponse(h.messageId, GetWithVersionResponse, KeyDoesNotExist, topologyResponse, None, 0)
    }
 
-   override def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
-      val messageId = header.messageId
-      header.op match {
+   override def handleCustomRequest(h: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
+      val messageId = h.messageId
+      h.op match {
          case RemoveIfUnmodifiedRequest => {
             val k = readKey(buffer)
-            val params = readParameters(header, buffer)
+            val params = readParameters(h, buffer)
             val prev = cache.get(k)
             if (prev != null) {
                if (prev.version == params.get.streamVersion) {
                   val removed = cache.remove(k, prev);
                   if (removed)
-                     // new Response(messageId, RemoveIfUnmodifiedResponse, Success)
-                     createResponse(header, RemoveIfUnmodifiedResponse, Success, prev)
+                     createResponse(h, RemoveIfUnmodifiedResponse, Success, prev)
                   else
-                     // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
-                     createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
+                     createResponse(h, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
                } else {
-                  // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
-                  createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
+                  createResponse(h, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
                }
             } else {
-               // new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
-               createResponse(header, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
+               createResponse(h, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
             }
          }
          case ContainsKeyRequest => {
+            val topologyResponse = getTopologyResponse(h)
             val k = readKey(buffer)
             if (cache.containsKey(k))
-               new Response(messageId, ContainsKeyResponse, Success)
+               new Response(messageId, ContainsKeyResponse, Success, topologyResponse)
             else
-               new Response(messageId, ContainsKeyResponse, KeyDoesNotExist)
+               new Response(messageId, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
          }
          case ClearRequest => {
+            val topologyResponse = getTopologyResponse(h)
             cache.clear
-            new Response(messageId, ClearResponse, Success)
+            new Response(messageId, ClearResponse, Success, topologyResponse)
          }
-         case PingRequest => new Response(messageId, PingResponse, Success) 
+         case PingRequest => {
+            val topologyResponse = getTopologyResponse(h)
+            new Response(messageId, PingResponse, Success, topologyResponse) 
+         }
       }
    }
 
-   override def createStatsResponse(header: HotRodHeader, cacheStats: Stats): AnyRef = {
+   override def createStatsResponse(h: HotRodHeader, cacheStats: Stats): AnyRef = {
       val stats = mutable.Map.empty[String, String]
       stats += ("timeSinceStart" -> cacheStats.getTimeSinceStart.toString)
       stats += ("currentNumberOfEntries" -> cacheStats.getCurrentNumberOfEntries.toString)
@@ -145,9 +151,42 @@
       stats += ("misses" -> cacheStats.getMisses.toString)
       stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
       stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
-      new StatsResponse(header.messageId, immutable.Map[String, String]() ++ stats)
+      val topologyResponse = getTopologyResponse(h)
+      new StatsResponse(h.messageId, immutable.Map[String, String]() ++ stats, topologyResponse)
    }
 
+   override def createErrorResponse(h: HotRodHeader, t: Throwable): AnyRef = {
+      t match {
+         case i: IOException =>
+            new ErrorResponse(h.messageId, ParseError, getTopologyResponse(h), i.toString)
+         case t: TimeoutException =>
+            new ErrorResponse(h.messageId, OperationTimedOut, getTopologyResponse(h), t.toString)
+         case t: Throwable =>
+            new ErrorResponse(h.messageId, ServerError, getTopologyResponse(h), t.toString)
+      }
+   }
+
+   private def getTopologyResponse(h: HotRodHeader): Option[AbstractTopologyResponse] = {
+      // If clustered, set up a cache for topology information
+      if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
+         val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
+         h.clientIntelligence match {
+            case 2 | 3 => {
+               val currentTopologyView = topologyCache.get("view")
+               if (h.topologyId != currentTopologyView.topologyId) {
+                  if (h.clientIntelligence == 2) {
+                     Some(TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members)))
+                  } else { // Must be 3
+                     // TODO: Implement hash-distribution-aware reply
+                     None
+                  }
+               } else None
+            }
+            case 1 => None
+         }
+      } else None
+   }
+
 }
 
 object RequestResolver extends Logging {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -6,8 +6,7 @@
 import transport._
 import OperationStatus._
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
-import java.io.{IOException, StreamCorruptedException}
-import org.infinispan.util.concurrent.TimeoutException
+import java.io.StreamCorruptedException
 import org.infinispan.server.hotrod.ProtocolFlag._
 import org.infinispan.server.hotrod.OperationResponse._
 import java.nio.channels.ClosedChannelException
@@ -24,7 +23,8 @@
    type SuitableHeader = HotRodHeader
    type SuitableParameters = RequestParameters
 
-   @volatile private var isError = false
+   private var isError = false
+   private var joined = false
 
    override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
       try {
@@ -65,6 +65,7 @@
    }
 
    override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
+      // TODO: Document this in wiki
       if (header.cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME) cacheManager.getCache[CacheKey, CacheValue]
       else cacheManager.getCache(header.cacheName)
    }
@@ -91,7 +92,7 @@
       h.decoder.createNotExistResponse(h)
 
    override def createGetResponse(h: HotRodHeader, k: CacheKey, v: CacheValue): AnyRef =
-      h.decoder.createGetResponse(h.messageId, v, h.op)
+      h.decoder.createGetResponse(h, v, h.op)
 
    override def createMultiGetResponse(h: HotRodHeader, pairs: Map[CacheKey, CacheValue]): AnyRef =
       null // Unsupported
@@ -105,18 +106,16 @@
    override def createErrorResponse(t: Throwable): AnyRef = {
       t match {
          case se: ServerException => {
-            val messageId = se.header.asInstanceOf[HotRodHeader].messageId
+            val h = se.header.asInstanceOf[HotRodHeader]
             se.getCause match {
-               case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, i.toString)
-               case u: UnknownOperationException => new ErrorResponse(messageId, UnknownOperation, u.toString)
-               case u: UnknownVersionException => new ErrorResponse(messageId, UnknownVersion, u.toString)
-               case i: IOException => new ErrorResponse(messageId, ParseError, i.toString)
-               case t: TimeoutException => new ErrorResponse(messageId, OperationTimedOut, t.toString)
-               case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
+               case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, None, i.toString)
+               case u: UnknownOperationException => new ErrorResponse(h.messageId, UnknownOperation, None, u.toString)
+               case u: UnknownVersionException => new ErrorResponse(h.messageId, UnknownVersion, None, u.toString)
+               case t: Throwable => h.decoder.createErrorResponse(h, t)
             }
          }
          case c: ClosedChannelException => null
-         case t: Throwable => new ErrorResponse(0, ServerError, t.toString)
+         case t: Throwable => new ErrorResponse(0, ServerError, None, t.toString)
       }
    }
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -16,7 +16,7 @@
 
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
       trace("Encode msg {0}", msg)
-      val buffer: ChannelBuffer = msg match {
+      val buffer: ChannelBuffer = msg match { 
          case r: Response => writeHeader(r)
       }
       msg match {
@@ -52,7 +52,24 @@
       buffer.writeUnsignedLong(r.messageId)
       buffer.writeByte(r.operation.id.byteValue)
       buffer.writeByte(r.status.id.byteValue)
-      buffer.writeByte(0) // TODO: topology change marker, implemented later
+      if (r.topologyResponse != None) {
+         buffer.writeByte(1) // Topology changed
+         r.topologyResponse.get match {
+            case t: TopologyAwareResponse => {
+               buffer.writeUnsignedInt(t.view.topologyId)
+               buffer.writeUnsignedInt(t.view.members.size)
+               t.view.members.foreach{address =>
+                  buffer.writeString(address.host)
+                  buffer.writeUnsignedShort(address.port)
+               }
+            }
+            case h: HashDistAwareResponse => {
+               // TODO: Implement reply to hash dist responses
+            }
+         }
+      } else {
+         buffer.writeByte(0) // No topology change
+      }
       buffer
    }
    

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -1,8 +1,12 @@
 package org.infinispan.server.hotrod
 
 import org.infinispan.manager.CacheManager
-import org.infinispan.server.core.AbstractProtocolServer
 import org.infinispan.server.core.transport.{Decoder, Encoder}
+import org.jgroups.blocks.RequestOptions
+import org.infinispan.server.core.{Logging, AbstractProtocolServer}
+import org.infinispan.config.Configuration
+import org.infinispan.config.Configuration.CacheMode
+import org.infinispan.Cache
 
 /**
  * // TODO: Document this
@@ -12,8 +16,49 @@
 
 class HotRodServer extends AbstractProtocolServer("HotRod") {
 
+   import HotRodServer._
+
    override def getEncoder: Encoder = new HotRodEncoder
 
    override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
 
+   override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int) {
+      super.start(host, port, cacheManager, masterThreads, workerThreads)
+      // If clustered, set up a cache for topology information
+      if (cacheManager.getGlobalConfiguration.getTransportClass != null) {
+         defineTopologyCacheConfig(cacheManager)
+         val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)
+         val currentView = topologyCache.get("view")
+         if (currentView != null) {
+            // TODO: If distribution configured, add hashcode of this address
+            val newMembers = currentView.members ::: List(TopologyAddress(host, port, 0))
+            val newView = TopologyView(currentView.topologyId + 1, newMembers)
+            val replaced = topologyCache.replace("view", currentView, newView)
+            if (!replaced) {
+               // TODO: There was a concurrent view modification, get and try to install new view again.
+            }
+         } else {
+            // TODO add check for distribution and if so, put the right hashcode
+            val newMembers = List(TopologyAddress(host, port, 0))
+            val newView = TopologyView(1, newMembers)
+            val prev = topologyCache.putIfAbsent("view", newView)
+            if (prev != null) {
+               // TODO: There was a concurrent view modification, get and try to install new view again.
+            }
+         }
+      }
+   }
+
+   protected def defineTopologyCacheConfig(cacheManager: CacheManager) {
+      val topologyCacheConfig = new Configuration
+      topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
+      topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
+      topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
+      cacheManager.defineConfiguration(TopologyCacheName, topologyCacheConfig)
+   }
+
+}
+
+object HotRodServer {
+   val TopologyCacheName = "___hotRodTopologyCache"
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -9,7 +9,9 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus) {
+// TODO: Maybe add clientIntelligence to response to decide what information to send back
+class Response(val messageId: Long, val operation: OperationResponse, val status: OperationStatus,
+               val topologyResponse: Option[AbstractTopologyResponse]) {
    override def toString = {
       new StringBuilder().append("Response").append("{")
          .append("messageId=").append(messageId)
@@ -20,8 +22,10 @@
 }
 
 class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
-                                override val status: OperationStatus, val previous: Option[Array[Byte]])
-      extends Response(messageId, operation, status) {
+                           override val status: OperationStatus,
+                           override val topologyResponse: Option[AbstractTopologyResponse],
+                           val previous: Option[Array[Byte]])
+      extends Response(messageId, operation, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("ResponseWithPrevious").append("{")
          .append("messageId=").append(messageId)
@@ -33,8 +37,9 @@
 }
 
 class GetResponse(override val messageId: Long, override val operation: OperationResponse,
-                  override val status: OperationStatus, val data: Option[Array[Byte]])
-      extends Response(messageId, operation, status) {
+                  override val status: OperationStatus, override val topologyResponse: Option[AbstractTopologyResponse],
+                  val data: Option[Array[Byte]])
+      extends Response(messageId, operation, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("GetResponse").append("{")
          .append("messageId=").append(messageId)
@@ -46,9 +51,10 @@
 }
 
 class GetWithVersionResponse(override val messageId: Long, override val operation: OperationResponse,
-                             override val status: OperationStatus, override val data: Option[Array[Byte]],
-                             val version: Long)
-      extends GetResponse(messageId, operation, status, data) {
+                             override val status: OperationStatus,
+                             override val topologyResponse: Option[AbstractTopologyResponse],
+                             override val data: Option[Array[Byte]], val version: Long)
+      extends GetResponse(messageId, operation, status, topologyResponse, data) {
    override def toString = {
       new StringBuilder().append("GetWithVersionResponse").append("{")
          .append("messageId=").append(messageId)
@@ -61,7 +67,8 @@
 }
 
 class ErrorResponse(override val messageId: Long, override val status: OperationStatus,
-                    val msg: String) extends Response(messageId, ErrorResponse, status) {
+                    override val topologyResponse: Option[AbstractTopologyResponse], val msg: String)
+      extends Response(messageId, ErrorResponse, status, topologyResponse) {
    override def toString = {
       new StringBuilder().append("ErrorResponse").append("{")
          .append("messageId=").append(messageId)
@@ -72,11 +79,21 @@
    }
 }
 
-class StatsResponse(override val messageId: Long, val stats: Map[String, String]) extends Response(messageId, StatsResponse, Success) {
+class StatsResponse(override val messageId: Long, val stats: Map[String, String],
+                    override val topologyResponse: Option[AbstractTopologyResponse])
+      extends Response(messageId, StatsResponse, Success, topologyResponse) {
    override def toString = {
       new StringBuilder().append("StatsResponse").append("{")
          .append("messageId=").append(messageId)
          .append(", stats=").append(stats)
          .append("}").toString
    }
-}
\ No newline at end of file
+}
+
+abstract class AbstractTopologyResponse(val view: TopologyView)
+
+case class TopologyAwareResponse(override val view: TopologyView)
+      extends AbstractTopologyResponse(view)
+
+case class HashDistAwareResponse(override val view: TopologyView, numKeyOwners: Short, hashFunction: Byte, hashSpaceSize: Int)
+      extends AbstractTopologyResponse(view)
\ No newline at end of file

Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala	                        (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyAddress.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -0,0 +1,30 @@
+package org.infinispan.server.hotrod
+
+import java.io.{ObjectInput, ObjectOutput}
+import org.infinispan.marshall.Marshallable
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Marshallable(externalizer = classOf[TopologyAddress.Externalizer], id = 58)
+case class TopologyAddress(val host: String, val port: Int, val hostHashCode: Int)
+
+object TopologyAddress {
+   class Externalizer extends org.infinispan.marshall.Externalizer {
+      override def writeObject(output: ObjectOutput, obj: AnyRef) {
+         val topologyAddress = obj.asInstanceOf[TopologyAddress]
+         output.writeObject(topologyAddress.host)
+         output.writeInt(topologyAddress.port)
+         output.writeInt(topologyAddress.hostHashCode)
+      }
+
+      override def readObject(input: ObjectInput): AnyRef = {
+         val host = input.readObject.asInstanceOf[String]
+         val port = input.readInt
+         val hostHashCode = input.readInt
+         TopologyAddress(host, port, hostHashCode)
+      }
+   }
+}
\ No newline at end of file

Added: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	                        (rev 0)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/TopologyView.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -0,0 +1,28 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.marshall.Marshallable
+import java.io.{ObjectInput, ObjectOutput}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Marshallable(externalizer = classOf[TopologyView.Externalizer], id = 59)
+case class TopologyView(val topologyId: Int, val members: List[TopologyAddress])
+
+object TopologyView {
+   class Externalizer extends org.infinispan.marshall.Externalizer {
+      override def writeObject(output: ObjectOutput, obj: AnyRef) {
+         val topologyView = obj.asInstanceOf[TopologyView]
+         output.writeInt(topologyView.topologyId)
+         output.writeObject(topologyView.members.toArray) // Write arrays instead since writing Lists causes issues
+      }
+
+      override def readObject(input: ObjectInput): AnyRef = {
+         val topologyId = input.readInt
+         val members = input.readObject.asInstanceOf[Array[TopologyAddress]]
+         TopologyView(topologyId, members.toList)
+      }
+   }
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -29,15 +29,15 @@
    override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
    
    def testUnknownCommand(m: Method) {
-      val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0)
-      assertTrue(status == UnknownOperation,
+      val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 1, 0).status
+      assertEquals(status, UnknownOperation,
          "Status should have been 'UnknownOperation' but instead was: " + status)
    }
 
    def testUnknownMagic(m: Method) {
       client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
-      val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0)
-      assertTrue(status == InvalidMagicOrMsgId,
+      val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0).status
+      assertEquals(status, InvalidMagicOrMsgId,
          "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
    }
 
@@ -48,7 +48,7 @@
    }
 
    def testPutOnDefaultCache(m: Method) {
-      val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0)
+      val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0, 1, 0).status
       assertStatus(status, Success)
       val cache = cacheManager.getCache[CacheKey, CacheValue]
       val value = cache.get(new CacheKey(k(m)))
@@ -58,280 +58,254 @@
    def testPutWithLifespan(m: Method) {
       client.assertPut(m, 1, 0)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testPutWithMaxIdle(m: Method) {
       client.assertPut(m, 0, 1)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testPutWithPreviousValue(m: Method) {
-      val (status, previous) = client.put(k(m) , 0, 0, v(m), 1)
-      assertSuccess(status, Array(), previous)
-      val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
-      assertSuccess(status2, v(m), previous2)
+      var resp = client.put(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, Success)
+      assertEquals(resp.previous, None)
+      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      assertSuccess(resp, v(m))
    }
 
    def testGetBasic(m: Method) {
       client.assertPut(m)
-      val (getSt, actual) = client.assertGet(m)
-      assertSuccess(getSt, v(m), actual)
+      assertSuccess(client.assertGet(m), v(m))
    }
 
    def testGetDoesNotExist(m: Method) {
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testPutIfAbsentNotExist(m: Method) {
-      val status = client.putIfAbsent(k(m) , 0, 0, v(m))
+      val status = client.putIfAbsent(k(m) , 0, 0, v(m)).status
       assertStatus(status, Success)
    }
 
    def testPutIfAbsentExist(m: Method) {
       client.assertPut(m)
-      val status = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
+      val status = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-")).status
       assertStatus(status, OperationNotExecuted)
    }
 
    def testPutIfAbsentWithLifespan(m: Method) {
-      val status = client.putIfAbsent(k(m) , 1, 0, v(m))
+      val status = client.putIfAbsent(k(m) , 1, 0, v(m)).status
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testPutIfAbsentWithMaxIdle(m: Method) {
-      val status = client.putIfAbsent(k(m) , 0, 1, v(m))
+      val status = client.putIfAbsent(k(m) , 0, 1, v(m)).status
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testPutIfAbsentWithPreviousValue(m: Method) {
-      val (status, previous) = client.putIfAbsent(k(m) , 0, 0, v(m), 1)
-      assertSuccess(status, Array(), previous)
-      val (status2, previous2) = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1)
-      assertStatus(status2, OperationNotExecuted)
-      assertTrue(Arrays.equals(v(m), previous2))
+      var resp = client.putIfAbsent(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, Success)
+      assertEquals(resp.previous, None)
+      resp = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), resp.previous.get))
    }
 
    def testReplaceBasic(m: Method) {
       client.assertPut(m)
-      val status = client.replace(k(m), 0, 0, v(m, "v1-"))
+      val status = client.replace(k(m), 0, 0, v(m, "v1-")).status
       assertStatus(status, Success)
-      val (getSt, actual) = client.assertGet(m)
-      assertSuccess(getSt, v(m, "v1-"), actual)
+      assertSuccess(client.assertGet(m), v(m, "v1-"))
    }
 
    def testNotReplaceIfNotPresent(m: Method) {
-      val status = client.replace(k(m), 0, 0, v(m))
+      val status = client.replace(k(m), 0, 0, v(m)).status
       assertStatus(status, OperationNotExecuted)
    }
 
    def testReplaceWithLifespan(m: Method) {
       client.assertPut(m)
-      val status = client.replace(k(m), 1, 0, v(m, "v1-"))
+      val status = client.replace(k(m), 1, 0, v(m, "v1-")).status
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testReplaceWithMaxIdle(m: Method) {
       client.assertPut(m)
-      val status = client.replace(k(m), 0, 1, v(m, "v1-"))
+      val status = client.replace(k(m), 0, 1, v(m, "v1-")).status
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testReplaceWithPreviousValue(m: Method) {
-      val (status, previous) = client.replace(k(m) , 0, 0, v(m), 1)
-      assertStatus(status, OperationNotExecuted)
-      assertEquals(previous.length, 0)
-      val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
-      assertSuccess(status2, Array(), previous2)
-      val (status3, previous3) = client.replace(k(m) , 0, 0, v(m, "v3-"), 1)
-      assertSuccess(status3, v(m, "v2-"), previous3)
+      var resp = client.replace(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, OperationNotExecuted)
+      assertEquals(resp.previous, None)
+      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, Success)
+      assertEquals(resp.previous, None)
+      resp = client.replace(k(m) , 0, 0, v(m, "v3-"), 1).asInstanceOf[ResponseWithPrevious]
+      assertSuccess(resp, v(m, "v2-"))
    }
 
    def testGetWithVersionBasic(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
+      assertSuccess(client.getWithVersion(k(m), 0), v(m), 0)
    }
 
    def testGetWithVersionDoesNotExist(m: Method) {
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertKeyDoesNotExist(getSt, actual)
-      assertTrue(version == 0)
+      val resp = client.getWithVersion(k(m), 0)
+      assertKeyDoesNotExist(resp)
+      assertTrue(resp.version == 0)
    }
 
    def testReplaceIfUnmodifiedBasic(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+      val resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      val status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, Success)
    }
 
    def testReplaceIfUnmodifiedNotFound(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val status = client.replaceIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+      val resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      val status = client.replaceIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, KeyDoesNotExist)
    }
 
    def testReplaceIfUnmodifiedNotExecuted(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+      var resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, Success)
-      val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt2, v(m, "v1-"), actual2)
-      assertTrue(version2 != 0)
-      assertTrue(version != version2)
-      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
+      val resp2 = client.getWithVersion(k(m), 0)
+      assertSuccess(resp2, v(m, "v1-"), 0)
+      assertTrue(resp.version != resp2.version)
+      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp.version).status
       assertStatus(status, OperationNotExecuted)
-      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
+      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp2.version).status
       assertStatus(status, Success)
    }
 
    def testReplaceIfUnmodifiedWithPreviousValue(m: Method) {
-      val (status, previous) = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
-      assertStatus(status, KeyDoesNotExist)
-      assertEquals(previous.length, 0)
+      var resp = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, KeyDoesNotExist)
+      assertEquals(resp.previous, None)
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val (status2, previous2)  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
-      assertStatus(status2, OperationNotExecuted)
-      assertTrue(Arrays.equals(v(m), previous2))
-      val (status3, previous3)  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
-      assertStatus(status3, Success)
-      assertTrue(Arrays.equals(v(m), previous3))
+      val getResp = client.getWithVersion(k(m), 0)
+      assertSuccess(getResp, v(m), 0)
+      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), resp.previous.get))
+      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, Success)
+      assertTrue(Arrays.equals(v(m), resp.previous.get))
    }
 
    def testRemoveBasic(m: Method) {
       client.assertPut(m)
-      val status = client.remove(k(m))
+      val status = client.remove(k(m)).status
       assertStatus(status, Success)
-      val (getSt, actual) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testRemoveDoesNotExist(m: Method) {
-      val status = client.remove(k(m))
+      val status = client.remove(k(m)).status
       assertStatus(status, KeyDoesNotExist)
    }
 
    def testRemoveWithPreviousValue(m: Method) {
-      val (status, previous) = client.remove(k(m), 1)
-      assertStatus(status, KeyDoesNotExist)
-      assertEquals(previous.length, 0)
+      var resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, KeyDoesNotExist)
+      assertEquals(resp.previous, None)
       client.assertPut(m)
-      val (status2, previous2) = client.remove(k(m), 1)
-      assertSuccess(status2, v(m), previous2)
+      resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+      assertSuccess(resp, v(m))
    }
 
    def testRemoveIfUnmodifiedBasic(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+      val resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      assertTrue(resp.version != 0)
+      val status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, Success)
-      val (getSt2, actual2) = client.assertGet(m)
-      assertKeyDoesNotExist(getSt2, actual2)
+      assertKeyDoesNotExist(client.assertGet(m))
    }
 
    def testRemoveIfUnmodifiedNotFound(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val status = client.removeIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+      var resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      val status = client.removeIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, KeyDoesNotExist)
-      val (getSt2, actual2) = client.assertGet(m)
-      assertSuccess(getSt2, v(m), actual2)
+      assertSuccess(client.assertGet(m), v(m))
    }
 
    def testRemoveIfUnmodifiedNotExecuted(m: Method) {
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
+      val resp = client.getWithVersion(k(m), 0)
+      assertSuccess(resp, v(m), 0)
+      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), resp.version).status
       assertStatus(status, Success)
-      val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt2, v(m, "v1-"), actual2)
-      assertTrue(version2 != 0)
-      assertTrue(version != version2)
-      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
+      val resp2 = client.getWithVersion(k(m), 0)
+      assertSuccess(resp2, v(m, "v1-"), 0)
+      assertTrue(resp.version != resp2.version)
+      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp.version).status
       assertStatus(status, OperationNotExecuted)
-      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
+      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), resp2.version).status
       assertStatus(status, Success)
    }
 
    def testRemoveIfUmodifiedWithPreviousValue(m: Method) {
-      val (status, previous) = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
-      assertStatus(status, KeyDoesNotExist)
-      assertEquals(previous.length, 0)
+      var resp = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, KeyDoesNotExist)
+      assertEquals(resp.previous, None)
       client.assertPut(m)
-      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
-      assertTrue(version != 0)
-      val (status2, previous2)  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
-      assertStatus(status2, OperationNotExecuted)
-      assertTrue(Arrays.equals(v(m), previous2))
-      val (status3, previous3)  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
-      assertStatus(status3, Success)
-      assertTrue(Arrays.equals(v(m), previous3))
+      val getResp = client.getWithVersion(k(m), 0)
+      assertSuccess(getResp, v(m), 0)
+      resp  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), resp.previous.get))
+      resp = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+      assertStatus(resp.status, Success)
+      assertTrue(Arrays.equals(v(m), resp.previous.get))
    }
 
    def testContainsKeyBasic(m: Method) {
       client.assertPut(m)
-      val status = client.containsKey(k(m), 0)
-      assertStatus(status, Success)
+      assertStatus(client.containsKey(k(m), 0).status, Success)
    }
 
    def testContainsKeyDoesNotExist(m: Method) {
-      val status = client.containsKey(k(m), 0)
-      assertStatus(status, KeyDoesNotExist)
+      assertStatus(client.containsKey(k(m), 0).status, KeyDoesNotExist)
    }
 
    def testClear(m: Method) {
       for (i <- 1 to 5) {
          val key = k(m, "k" + i + "-");
          val value = v(m, "v" + i + "-");
-         var status = client.put(key , 0, 0, value)
-         assertStatus(status, Success)
-         status = client.containsKey(key, 0)
-         assertStatus(status, Success)
+         assertStatus(client.put(key , 0, 0, value).status, Success)
+         assertStatus(client.containsKey(key, 0).status, Success)
       }
 
-      val status = client.clear
-      assertStatus(status, Success)
+      assertStatus(client.clear.status, Success)
 
       for (i <- 1 to 5) {
          val key = k(m, "k" + i + "-")
-         val status = client.containsKey(key, 0)
-         assertStatus(status, KeyDoesNotExist)
+         assertStatus(client.containsKey(key, 0).status, KeyDoesNotExist)
       }
    }
 
@@ -349,8 +323,23 @@
    }
 
    def testPing(m: Method) {
-      val status = client.ping
+      val status = client.ping.status
       assertStatus(status, Success)
    }
 
+   def testPingWithTopologyAwareClient(m: Method) {
+      var resp = client.ping
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      resp = client.ping(1, 0)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      resp = client.ping(2, 0)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      resp = client.ping(3, 0)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+   }
+   
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodReplicationTest.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,10 +3,12 @@
 import org.infinispan.test.MultipleCacheManagersTest
 import org.infinispan.config.Configuration
 import java.lang.reflect.Method
-import org.testng.annotations.{AfterClass, Test}
 import test.HotRodClient
 import test.HotRodTestingUtil._
 import org.infinispan.server.hotrod.OperationStatus._
+import org.infinispan.config.Configuration.CacheMode
+import org.testng.Assert._
+import org.testng.annotations.{AfterMethod, AfterClass, Test}
 
 /**
  * // TODO: Document this
@@ -16,17 +18,29 @@
 
 @Test(groups = Array("functional"), testName = "server.hotrod.ClusterTest")
 class HotRodReplicationTest extends MultipleCacheManagersTest {
+
+   import HotRodServer._
+
    private val cacheName = "hotRodReplSync"
    private[this] var servers: List[HotRodServer] = List()
    private[this] var clients: List[HotRodClient] = List()
 
    @Test(enabled=false) // Disable explicitly to avoid TestNG thinking this is a test!!
    override def createCacheManagers {
-      var config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+      val config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
       config.setFetchInMemoryState(true)
+
+      val topologyCacheConfig = new Configuration
+      topologyCacheConfig.setCacheMode(CacheMode.REPL_SYNC)
+      topologyCacheConfig.setSyncReplTimeout(10000) // Milliseconds
+      topologyCacheConfig.setFetchInMemoryState(true) // State transfer required
+      topologyCacheConfig.setSyncCommitPhase(true) // Only for testing, so that asserts work fine.
+      topologyCacheConfig.setSyncRollbackPhase(true) // Only for testing, so that asserts work fine.
+
       for (i <- 0 until 2) {
          val cm = addClusterEnabledCacheManager()
          cm.defineConfiguration(cacheName, config)
+         cm.defineConfiguration(TopologyCacheName, topologyCacheConfig)
       }
       servers = startHotRodServer(cacheManagers.get(0)) :: servers
       servers = startHotRodServer(cacheManagers.get(1), servers.head.getPort + 50) :: servers
@@ -43,40 +57,80 @@
       servers.foreach(_.stop)
    }
 
+   @AfterMethod(alwaysRun=true)
+   override def clearContent() {
+      // Do not clear cache between methods so that topology cache does not get cleared
+   }
+
    def testReplicatedPut(m: Method) {
-      val putSt = clients.head.put(k(m) , 0, 0, v(m))
+      val putSt = clients.head.put(k(m) , 0, 0, v(m)).status
       assertStatus(putSt, Success)
-      val (getSt, actual) = clients.tail.head.get(k(m), 0)
-      assertSuccess(getSt, v(m), actual)
+      assertSuccess(clients.tail.head.get(k(m), 0), v(m))
    }
 
    def testReplicatedPutIfAbsent(m: Method) {
-      val (getSt, actual) = clients.head.assertGet(m)
-      assertKeyDoesNotExist(getSt, actual)
-      val (getSt2, actual2) = clients.tail.head.assertGet(m)
-      assertKeyDoesNotExist(getSt2, actual2)
-      val putSt = clients.head.putIfAbsent(k(m) , 0, 0, v(m))
+      assertKeyDoesNotExist(clients.head.assertGet(m))
+      assertKeyDoesNotExist(clients.tail.head.assertGet(m))
+      var putSt = clients.head.putIfAbsent(k(m) , 0, 0, v(m)).status
       assertStatus(putSt, Success)
-      val (getSt3, actual3) = clients.tail.head.get(k(m), 0)
-      assertSuccess(getSt3, v(m), actual3)
-      val putSt2 = clients.tail.head.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
-      assertStatus(putSt2, OperationNotExecuted)
+      assertSuccess(clients.tail.head.get(k(m), 0), v(m))
+      assertStatus(clients.tail.head.putIfAbsent(k(m) , 0, 0, v(m, "v2-")).status, OperationNotExecuted)
    }
 
    def testReplicatedReplace(m: Method) {
-      val status = clients.head.replace(k(m), 0, 0, v(m))
+      var status = clients.head.replace(k(m), 0, 0, v(m)).status
       assertStatus(status, OperationNotExecuted)
-      val status2 = clients.tail.head.replace(k(m), 0, 0, v(m))
-      assertStatus(status2, OperationNotExecuted)
+      status = clients.tail.head.replace(k(m), 0, 0, v(m)).status
+      assertStatus(status , OperationNotExecuted)
       clients.tail.head.assertPut(m)
-      val status3 = clients.tail.head.replace(k(m), 0, 0, v(m, "v1-"))
-      assertStatus(status3, Success)
-      val (getSt, actual) = clients.head.assertGet(m)
-      assertSuccess(getSt, v(m, "v1-"), actual)
-      val status4 = clients.head.replace(k(m), 0, 0, v(m, "v2-"))
-      assertStatus(status4, Success)
-      val (getSt2, actual2) = clients.tail.head.assertGet(m)
-      assertSuccess(getSt2, v(m, "v2-"), actual2)
+      status = clients.tail.head.replace(k(m), 0, 0, v(m, "v1-")).status
+      assertStatus(status, Success)
+      assertSuccess(clients.head.assertGet(m), v(m, "v1-"))
+      status = clients.head.replace(k(m), 0, 0, v(m, "v2-")).status
+      assertStatus(status, Success)
+      assertSuccess(clients.tail.head.assertGet(m), v(m, "v2-"))
    }
 
+   def testPingWithTopologyAwareClient(m: Method) {
+      var resp = clients.head.ping
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      resp = clients.tail.head.ping(1, 0)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      resp = clients.head.ping(2, 0)
+      assertStatus(resp.status, Success)
+      assertTopologyReceived(resp.topologyResponse.get)
+      resp = clients.tail.head.ping(2, 1)
+      assertStatus(resp.status, Success)
+      assertTopologyReceived(resp.topologyResponse.get)
+      resp = clients.tail.head.ping(2, 2)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+   }
+
+   private def assertTopologyReceived(topologyResp: AbstractTopologyResponse) {
+      assertEquals(topologyResp.view.topologyId, 2)
+      assertEquals(topologyResp.view.members.size, 2)
+      assertEquals(topologyResp.view.members.head, TopologyAddress("127.0.0.1", 11311, 0))
+      assertEquals(topologyResp.view.members.tail.head, TopologyAddress("127.0.0.1", 11361, 0))
+   }
+
+   def testReplicatedPutWithTopologyAwareClient(m: Method) {
+      var resp = clients.head.put(k(m) , 0, 0, v(m), 1, 0)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      assertSuccess(clients.tail.head.get(k(m), 0), v(m))
+      resp = clients.head.put(k(m) , 0, 0, v(m, "v1-"), 2, 0)
+      assertStatus(resp.status, Success)
+      assertTopologyReceived(resp.topologyResponse.get)
+      resp = clients.tail.head.put(k(m) , 0, 0, v(m, "v2-"), 2, 1)
+      assertStatus(resp.status, Success)
+      assertTopologyReceived(resp.topologyResponse.get)
+      resp = clients.head.put(k(m) , 0, 0, v(m, "v3-"), 2, 2)
+      assertStatus(resp.status, Success)
+      assertEquals(resp.topologyResponse, None)
+      assertSuccess(clients.tail.head.get(k(m), 0), v(m, "v3-"))
+   }
+
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -34,8 +34,7 @@
       assertEquals(s.get("currentNumberOfEntries").get, "1")
       assertEquals(s.get("totalNumberOfEntries").get, "1")
       assertEquals(s.get("stores").get, "1")
-      val (getSt, actual) = client.assertGet(m)
-      assertSuccess(getSt, v(m), actual)
+      assertSuccess(client.assertGet(m), v(m))
       s = client.stats
       assertEquals(s.get("hits").get, "1")
       assertEquals(s.get("misses").get, "0")

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -52,87 +52,82 @@
    
    def stop = ch.disconnect
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0)
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], clientIntelligence: Byte, topologyId: Int): Response =
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, clientIntelligence, topologyId)
+
    def assertPut(m: Method) {
-      val status = put(k(m) , 0, 0, v(m))
+      val status = put(k(m) , 0, 0, v(m)).status
       assertStatus(status, Success)
    }
 
    def assertPut(m: Method, kPrefix: String, vPrefix: String) {
-      val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix))
+      val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix)).status
       assertStatus(status, Success)
    }
 
    def assertPut(m: Method, lifespan: Int, maxIdle: Int) {
-      val status = put(k(m) , lifespan, maxIdle, v(m))
+      val status = put(k(m) , lifespan, maxIdle, v(m)).status
       assertStatus(status, Success)
    }
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
       execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
 
-   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0)
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
-   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
       execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
    
-   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0)
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
-   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
       execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)   
 
-   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
-      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version)
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
 
-   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
       execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
 
-   def remove(k: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0)
+   def remove(k: Array[Byte]): Response =
+      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 1 ,0)
 
-   def remove(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+   def remove(k: Array[Byte], flags: Int): Response =
       execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, flags)
 
-   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
-      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version)
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
 
-   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
       execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
 
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-               v: Array[Byte], version: Long): OperationStatus = {
-      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
-      execute(op, op.id)._1
+               v: Array[Byte], version: Long, clientIntelligence: Byte, topologyId: Int): Response = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, clientIntelligence, topologyId)
+      execute(op, op.id)
    }
 
    def executeWithBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-                           v: Array[Byte], version: Long): OperationStatus = {
-      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
-      execute(op, 0)._1
+                           v: Array[Byte], version: Long): ErrorResponse = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, 1, 0)
+      execute(op, 0).asInstanceOf[ErrorResponse]
    }
 
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-               v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) = {
-      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+               v: Array[Byte], version: Long, flags: Int): Response = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version, 1, 0)
       execute(op, op.id)
    }
 
-   private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
+   private def execute(op: Op, expectedResponseMessageId: Long): Response = {
       writeOp(op)
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      if (op.flags == 1) {
-         val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
-         if (respWithPrevious.previous == None)
-            (respWithPrevious.status, Array())
-         else
-            (respWithPrevious.status, respWithPrevious.previous.get)
-      } else {
-         (handler.getResponse(expectedResponseMessageId).status, null)
-      }
+      handler.getResponse(expectedResponseMessageId)
    }
 
    private def writeOp(op: Op) {
@@ -142,45 +137,37 @@
       assertTrue(future.isSuccess)
    }
 
-   def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
-      val (getSt, actual, version) = get(0x03, k, 0)
-      (getSt, actual)
+   def get(k: Array[Byte], flags: Int): GetResponse = {
+      get(0x03, k, 0).asInstanceOf[GetResponse]
    }
 
-   def assertGet(m: Method): (OperationStatus, Array[Byte]) = assertGet(m, 0)
+   def assertGet(m: Method): GetResponse = assertGet(m, 0)
 
-   def assertGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = get(k(m), flags)   
+   def assertGet(m: Method, flags: Int): GetResponse = get(k(m), flags)
 
-   def containsKey(k: Array[Byte], flags: Int): OperationStatus = {
-      val (containsKeySt, actual, version) = get(0x0F, k, 0)
-      containsKeySt
+   def containsKey(k: Array[Byte], flags: Int): Response = {
+      get(0x0F, k, 0)
    }
 
-   def getWithVersion(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) =
-      get(0x11, k, 0)
+   def getWithVersion(k: Array[Byte], flags: Int): GetWithVersionResponse =
+      get(0x11, k, 0).asInstanceOf[GetWithVersionResponse]
 
-   private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
-      val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
+   private def get(code: Byte, k: Array[Byte], flags: Int): Response = {
+      val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0, 1, 0)
       val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      if (code == 0x03) {
-         val resp = handler.getResponse(op.id).asInstanceOf[GetResponse]
-         (resp.status, if (resp.data == None) null else resp.data.get, 0)
-      } else if (code == 0x11) {
-         val resp = handler.getResponse(op.id).asInstanceOf[GetWithVersionResponse]
-         (resp.status, if (resp.data == None) null else resp.data.get, resp.version)
-      } else if (code == 0x0F) {
-         (handler.getResponse(op.id).status, null, 0)
+      if (code == 0x03 || code == 0x11 || code == 0x0F) {
+         handler.getResponse(op.id)
       } else {
-         (OperationNotExecuted, null, 0)
+         null
       }
    }
 
-   def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0)
+   def clear: Response = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
 
    def stats: Map[String, String] = {
-      val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
+      val op = new StatsOp(0xA0, 0x15, defaultCacheName, 1, 0, null)
       val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
@@ -188,8 +175,11 @@
       resp.stats
    }
 
-   def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0)
+   def ping: Response = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
 
+   def ping(clientIntelligence: Byte, topologyId: Int): Response =
+      execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, clientIntelligence, topologyId)
+
 }
 
 private class ClientPipelineFactory(client: HotRodClient) extends ChannelPipelineFactory {
@@ -217,8 +207,8 @@
             buffer.writeByte(op.code) // opcode
             buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
             buffer.writeUnsignedInt(op.flags) // flags
-            buffer.writeByte(0) // client intelligence
-            buffer.writeUnsignedInt(0) // topology id
+            buffer.writeByte(op.clientIntelligence) // client intelligence
+            buffer.writeUnsignedInt(op.topologyId) // topology id
             if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
                buffer.writeRangedBytes(op.key) // key length + key
                if (op.value != null) {
@@ -255,6 +245,27 @@
       val opCode = OperationResponse.apply(buf.readUnsignedByte)
       val status = OperationStatus.apply(buf.readUnsignedByte)
       val topologyChangeMarker = buf.readUnsignedByte
+      val op = client.idToOp.get(id)
+      val topologyChangeResponse =
+         if (topologyChangeMarker == 1) {
+            val topologyId = buf.readUnsignedInt
+            if (op.clientIntelligence == 2) {
+               val numberClusterMembers = buf.readUnsignedInt
+               val viewArray = new Array[TopologyAddress](numberClusterMembers)
+               for (i <- 0 until numberClusterMembers) {
+                  val host = buf.readString
+                  val port = buf.readUnsignedShort
+                  viewArray(i) = TopologyAddress(host, port, 0)
+               }
+               Some(TopologyAwareResponse(TopologyView(topologyId, viewArray.toList)))
+            } else if (op.clientIntelligence == 3) {
+               None // TODO: Parse hash distribution aware
+            } else {
+               None // Is it possible?
+            }
+         } else {
+            None
+         }
       val resp: Response = opCode match {
          case StatsResponse => {
             val size = buf.readUnsignedInt
@@ -262,41 +273,40 @@
             for (i <- 1 to size) {
                stats += (buf.readString -> buf.readString)
             }
-            new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+            new StatsResponse(id, immutable.Map[String, String]() ++ stats, topologyChangeResponse)
          }
          case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
               | RemoveResponse | RemoveIfUnmodifiedResponse => {
-            val op = client.idToOp.get(id)
             if (op.flags == 1) {
                val length = buf.readUnsignedInt
                if (length == 0) {
-                  new ResponseWithPrevious(id, opCode, status, None)
+                  new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, None)
                } else {
                   val previous = new Array[Byte](length)
                   buf.readBytes(previous)
-                  new ResponseWithPrevious(id, opCode, status, Some(previous))
+                  new ResponseWithPrevious(id, opCode, status, topologyChangeResponse, Some(previous))
                }
-            } else new Response(id, opCode, status)
+            } else new Response(id, opCode, status, topologyChangeResponse)
          }
-         case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
+         case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status, topologyChangeResponse)
          case GetWithVersionResponse  => {
             if (status == Success) {
                val version = buf.readLong
                val data = Some(buf.readRangedBytes)
-               new GetWithVersionResponse(id, opCode, status, data, version)
+               new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, data, version)
             } else{
-               new GetWithVersionResponse(id, opCode, status, None, 0)
+               new GetWithVersionResponse(id, opCode, status, topologyChangeResponse, None, 0)
             }
          }
          case GetResponse => {
             if (status == Success) {
                val data = Some(buf.readRangedBytes)
-               new GetResponse(id, opCode, status, data)
+               new GetResponse(id, opCode, status, topologyChangeResponse, data)
             } else{
-               new GetResponse(id, opCode, status, None)
+               new GetResponse(id, opCode, status, topologyChangeResponse, None)
             }
          }
-         case ErrorResponse => new ErrorResponse(id, status, buf.readString)
+         case ErrorResponse => new ErrorResponse(id, status, topologyChangeResponse, buf.readString)
       }
       trace("Got response from server: {0}", resp)
       resp
@@ -328,7 +338,7 @@
             i += 1
          }
       }
-      while (v == null && i < 100)
+      while (v == null && i < 10000)
       v
    }
 
@@ -342,11 +352,15 @@
                  val maxIdle: Int,
                  val value: Array[Byte],
                  val flags: Int,
-                 val version: Long) {
+                 val version: Long,
+                 val clientIntelligence: Byte,
+                 val topologyId: Int) {
    lazy val id = HotRodClient.idCounter.incrementAndGet
 }
 
 class StatsOp(override val magic: Int,
-                 override val code: Byte,
-                 override val cacheName: String,
-                 val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file
+              override val code: Byte,
+              override val cacheName: String,
+              override val clientIntelligence: Byte,
+              override val topologyId: Int,
+              val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntelligence, topologyId)
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,11 +3,12 @@
 import java.util.concurrent.atomic.AtomicInteger
 import org.infinispan.manager.CacheManager
 import java.lang.reflect.Method
-import org.infinispan.server.hotrod.{HotRodServer}
 import org.infinispan.server.core.Logging
 import java.util.Arrays
 import org.infinispan.server.hotrod.OperationStatus._
 import org.testng.Assert._
+import org.infinispan.util.Util
+import org.infinispan.server.hotrod.{ResponseWithPrevious, GetWithVersionResponse, GetResponse, HotRodServer}
 
 /**
  * // TODO: Document this
@@ -24,14 +25,18 @@
       startHotRodServer(manager, UniquePortThreadLocal.get.intValue)
 
    def startHotRodServer(manager: CacheManager, port: Int): HotRodServer = {
-      val server = new HotRodServer
+      val server = new HotRodServer {
+         override protected def defineTopologyCacheConfig(cacheManager: CacheManager) {
+            // No-op since topology cache configuration comes defined by the test
+         }
+      }
       server.start(host, port, manager, 0, 0)
       server
    }
 
    def k(m: Method, prefix: String): Array[Byte] = {
       val bytes: Array[Byte] = (prefix + m.getName).getBytes
-      trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+      trace("String {0} is converted to {1} bytes", prefix + m.getName, Util.printArray(bytes, true))
       bytes
    }
 
@@ -47,19 +52,32 @@
       isSuccess
    }
 
-   def assertSuccess(status: OperationStatus, expected: Array[Byte], actual: Array[Byte]): Boolean = {
-      assertStatus(status, Success)
-      val isSuccess = Arrays.equals(expected, actual)
+   def assertSuccess(resp: GetResponse, expected: Array[Byte]): Boolean = {
+      assertStatus(resp.status, Success)
+      val isSuccess = Arrays.equals(expected, resp.data.get)
       assertTrue(isSuccess)
       isSuccess
    }
 
-   def assertKeyDoesNotExist(status: OperationStatus, actual: Array[Byte]): Boolean = {
+   def assertSuccess(resp: GetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
+      assertTrue(resp.version != expectedVersion)
+      assertSuccess(resp, expected)
+   }
+
+   def assertSuccess(resp: ResponseWithPrevious, expected: Array[Byte]): Boolean = {
+      assertStatus(resp.status, Success)
+      val isSuccess = Arrays.equals(expected, resp.previous.get)
+      assertTrue(isSuccess)
+      isSuccess
+   }
+
+   def assertKeyDoesNotExist(resp: GetResponse): Boolean = {
+      val status = resp.status
       assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
-      assertNull(actual)
+      assertEquals(resp.data, None)
       status == KeyDoesNotExist
    }
-   
+
 } 
 
 object UniquePortThreadLocal extends ThreadLocal[Int] {

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	2010-04-15 13:52:57 UTC (rev 1692)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	2010-04-15 14:48:24 UTC (rev 1693)
@@ -3,7 +3,7 @@
 import org.infinispan.server.core.CacheValue
 import org.infinispan.util.Util
 import java.io.{ObjectOutput, ObjectInput}
-import org.infinispan.marshall.{Marshallable, Externalizer}
+import org.infinispan.marshall.Marshallable
 
 /**
  * // TODO: Document this
@@ -11,7 +11,7 @@
  * @since
  */
 // TODO: putting Ids.MEMCACHED_CACHE_VALUE fails compilation in 2.8 - https://lampsvn.epfl.ch/trac/scala/ticket/2764
- at Marshallable(externalizer = classOf[MemcachedValueExternalizer], id = 56)
+ at Marshallable(externalizer = classOf[MemcachedValue.Externalizer], id = 56)
 class MemcachedValue(override val data: Array[Byte], override val version: Long, val flags: Int)
       extends CacheValue(data, version) {
 
@@ -25,20 +25,22 @@
 
 }
 
-private class MemcachedValueExternalizer extends Externalizer {
-   override def writeObject(output: ObjectOutput, obj: AnyRef) {
-      val cacheValue = obj.asInstanceOf[MemcachedValue]
-      output.write(cacheValue.data.length)
-      output.write(cacheValue.data)
-      output.writeLong(cacheValue.version)
-      output.writeInt(cacheValue.flags)
-   }
+object MemcachedValue {
+   class Externalizer extends org.infinispan.marshall.Externalizer {
+      override def writeObject(output: ObjectOutput, obj: AnyRef) {
+         val cacheValue = obj.asInstanceOf[MemcachedValue]
+         output.write(cacheValue.data.length)
+         output.write(cacheValue.data)
+         output.writeLong(cacheValue.version)
+         output.writeInt(cacheValue.flags)
+      }
 
-   override def readObject(input: ObjectInput): AnyRef = {
-      val data = new Array[Byte](input.read())
-      input.read(data)
-      val version = input.readLong
-      val flags = input.readInt
-      new MemcachedValue(data, version, flags)
+      override def readObject(input: ObjectInput): AnyRef = {
+         val data = new Array[Byte](input.read())
+         input.readFully(data)
+         val version = input.readLong
+         val flags = input.readInt
+         new MemcachedValue(data, version, flags)
+      }
    }
 }
\ No newline at end of file



More information about the infinispan-commits mailing list