[infinispan-commits] Infinispan SVN: r1890 - in branches/4.1.x/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jun 3 11:02:21 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-06-03 11:02:20 -0400 (Thu, 03 Jun 2010)
New Revision: 1890

Modified:
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
Log:
[ISPN-479] (Reduce use of maps in Hot Rod to increase performance) Done.

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -12,7 +12,6 @@
 import org.infinispan.util.concurrent.TimeoutException
 import java.io.IOException
 import org.infinispan.context.Flag.SKIP_REMOTE_LOOKUP
-import org.infinispan.manager.EmbeddedCacheManager
 
 /**
  * HotRod protocol decoder specific for specification version 1.0.
@@ -20,29 +19,38 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class Decoder10(cacheManager: EmbeddedCacheManager) extends AbstractVersionedDecoder {
-   import RequestResolver._
-   import ResponseResolver._
+object Decoder10 extends AbstractVersionedDecoder with Logging {
    import OperationResponse._
    import ProtocolFlag._
-   import HotRodServer._
    type SuitableHeader = HotRodHeader
 
-   private lazy val isClustered: Boolean = cacheManager.getGlobalConfiguration.getTransportClass != null
-   private lazy val topologyCache: Cache[String, TopologyView] =
-      if (isClustered) cacheManager.getCache(TopologyCacheName) else null
-
    override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
       val streamOp = buffer.readUnsignedByte
-      val op = toRequest(streamOp)
-      if (op == None) {
-         throw new UnknownOperationException("Unknown operation: " + streamOp);
+      val op = streamOp match {
+         case 0x01 => PutRequest
+         case 0x03 => GetRequest
+         case 0x05 => PutIfAbsentRequest
+         case 0x07 => ReplaceRequest
+         case 0x09 => ReplaceIfUnmodifiedRequest
+         case 0x0B => RemoveRequest
+         case 0x0D => RemoveIfUnmodifiedRequest
+         case 0x0F => ContainsKeyRequest
+         case 0x11 => GetWithVersionRequest
+         case 0x13 => ClearRequest
+         case 0x15 => StatsRequest
+         case 0x17 => PingRequest
+         case _ => throw new UnknownOperationException("Unknown operation: " + streamOp)
       }
+      if (isTraceEnabled) trace("Operation code: {0} has been matched to {1}", streamOp, op)
+      
       val cacheName = buffer.readString
-      val flag = ProtocolFlag.apply(buffer.readUnsignedInt)
+      val flag = buffer.readUnsignedInt match {
+         case 0 => NoFlag
+         case 1 => ForceReturnPreviousValue
+      }
       val clientIntelligence = buffer.readUnsignedByte
       val topologyId = buffer.readUnsignedInt
-      new HotRodHeader(op.get, messageId, cacheName, flag, clientIntelligence, topologyId, this)
+      new HotRodHeader(op, messageId, cacheName, flag, clientIntelligence, topologyId, this)
    }
 
    override def readKey(buffer: ChannelBuffer): CacheKey = new CacheKey(buffer.readRangedBytes)
@@ -83,26 +91,24 @@
       createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
 
    private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
-      val topologyResponse = getTopologyResponse(h)
       if (h.flag == ForceReturnPreviousValue)
-         new ResponseWithPrevious(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse,
+         new ResponseWithPrevious(h.messageId, h.cacheName, h.clientIntel, op, st, h.topologyId,
             if (prev == null) None else Some(prev.data))
       else
-         new Response(h.messageId, h.cacheName, h.clientIntel, op, st, topologyResponse)
+         new Response(h.messageId, h.cacheName, h.clientIntel, op, st, h.topologyId)
    }
 
    override def createGetResponse(h: HotRodHeader, v: CacheValue, op: Enumeration#Value): AnyRef = {
-      val topologyResponse = getTopologyResponse(h)
       if (v != null && op == GetRequest)
-         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, Success, topologyResponse, Some(v.data))
+         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, Success, h.topologyId, Some(v.data))
       else if (v != null && op == GetWithVersionRequest)
          new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, Success,
-            topologyResponse, Some(v.data), v.version)
+            h.topologyId, Some(v.data), v.version)
       else if (op == GetRequest)
-         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, KeyDoesNotExist, topologyResponse, None)
+         new GetResponse(h.messageId, h.cacheName, h.clientIntel, GetResponse, KeyDoesNotExist, h.topologyId, None)
       else
          new GetWithVersionResponse(h.messageId, h.cacheName, h.clientIntel, GetWithVersionResponse, KeyDoesNotExist,
-            topologyResponse, None, 0)
+            h.topologyId, None, 0)
    }
 
    override def handleCustomRequest(h: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
@@ -126,23 +132,18 @@
             }
          }
          case ContainsKeyRequest => {
-            val topologyResponse = getTopologyResponse(h)
             val k = readKey(buffer)
             if (cache.containsKey(k))
-               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, Success, topologyResponse)
+               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, Success, h.topologyId)
             else
-               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, KeyDoesNotExist, topologyResponse)
+               new Response(h.messageId, h.cacheName, h.clientIntel, ContainsKeyResponse, KeyDoesNotExist, h.topologyId)
          }
          case ClearRequest => {
-            val topologyResponse = getTopologyResponse(h)
             // Get an optimised cache in case we can make the operation more efficient
             getOptimizedCache(h, cache).clear
-            new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, topologyResponse)
+            new Response(h.messageId, h.cacheName, h.clientIntel, ClearResponse, Success, h.topologyId)
          }
-         case PingRequest => {
-            val topologyResponse = getTopologyResponse(h)
-            new Response(h.messageId, h.cacheName, h.clientIntel, PingResponse, Success, topologyResponse)
-         }
+         case PingRequest => new Response(h.messageId, h.cacheName, h.clientIntel, PingResponse, Success, h.topologyId)
       }
    }
 
@@ -157,45 +158,20 @@
       stats += ("misses" -> cacheStats.getMisses.toString)
       stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
       stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
-      val topologyResponse = getTopologyResponse(h)
-      new StatsResponse(h.messageId, h.cacheName, h.clientIntel, immutable.Map[String, String]() ++ stats, topologyResponse)
+      new StatsResponse(h.messageId, h.cacheName, h.clientIntel, immutable.Map[String, String]() ++ stats, h.topologyId)
    }
 
    override def createErrorResponse(h: HotRodHeader, t: Throwable): AnyRef = {
       t match {
          case i: IOException =>
-            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ParseError, getTopologyResponse(h), i.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ParseError, h.topologyId, i.toString)
          case t: TimeoutException =>
-            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, OperationTimedOut, getTopologyResponse(h), t.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, OperationTimedOut, h.topologyId, t.toString)
          case t: Throwable =>
-            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ServerError, getTopologyResponse(h), t.toString)
+            new ErrorResponse(h.messageId, h.cacheName, h.clientIntel, ServerError, h.topologyId, t.toString)
       }
    }
 
-   private def getTopologyResponse(h: HotRodHeader): Option[AbstractTopologyResponse] = {
-      // If clustered, set up a cache for topology information
-      if (isClustered) {
-         h.clientIntel match {
-            case 2 | 3 => {
-               val currentTopologyView = topologyCache.get("view")
-               if (h.topologyId != currentTopologyView.topologyId) {
-                  val cache = cacheManager.getCache(h.cacheName)
-                  val config = cache.getConfiguration
-                  if (h.clientIntel == 2 || !config.getCacheMode.isDistributed) {
-                     Some(TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members)))
-                  } else { // Must be 3 and distributed
-                     // TODO: Retrieve hash function when we have specified functions
-                     val hashSpace = cache.getAdvancedCache.getDistributionManager.getConsistentHash.getHashSpace
-                     Some(HashDistAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members),
-                           config.getNumOwners, 1, hashSpace))
-                  }
-               } else None
-            }
-            case 1 => None
-         }
-      } else None
-   }
-
    override def getOptimizedCache(h: HotRodHeader, c: Cache[CacheKey, CacheValue]): Cache[CacheKey, CacheValue] = {
       if (c.getConfiguration.getCacheMode.isDistributed && h.flag == ForceReturnPreviousValue) {
          c.getAdvancedCache.withFlags(SKIP_REMOTE_LOOKUP)
@@ -203,31 +179,22 @@
          c
       }
    }
-}
 
-object RequestResolver extends Logging {
-   private val requests = Map[Int, Enumeration#Value](
-      0x01 -> PutRequest,
-      0x03 -> GetRequest,
-      0x05 -> PutIfAbsentRequest,
-      0x07 -> ReplaceRequest,
-      0x09 -> ReplaceIfUnmodifiedRequest,
-      0x0B -> RemoveRequest,
-      0x0D -> RemoveIfUnmodifiedRequest,
-      0x0F -> ContainsKeyRequest,
-      0x11 -> GetWithVersionRequest,
-      0x13 -> ClearRequest,
-      0x15 -> StatsRequest,
-      0x17 -> PingRequest 
-   )
-
-   def toRequest(streamOp: Short): Option[Enumeration#Value] = {
-      val op = requests.get(streamOp)
-      if (op == None)
-         if (isTraceEnabled) trace("Operation code: {0} was unmatched", streamOp)
-      else
-         if (isTraceEnabled) trace("Operation code: {0} has been matched to {1}", streamOp, op)
-      op
+   def toResponse(request: Enumeration#Value): OperationResponse = {
+      request match {
+         case PutRequest => PutResponse
+         case GetRequest => GetResponse
+         case PutIfAbsentRequest => PutIfAbsentResponse
+         case ReplaceRequest => ReplaceResponse
+         case ReplaceIfUnmodifiedRequest => ReplaceIfUnmodifiedResponse
+         case RemoveRequest => RemoveResponse
+         case RemoveIfUnmodifiedRequest => RemoveIfUnmodifiedResponse
+         case ContainsKeyRequest => ContainsKeyResponse
+         case GetWithVersionRequest => GetWithVersionResponse
+         case ClearRequest => ClearResponse
+         case StatsRequest => StatsResponse
+         case PingRequest => PingResponse
+      }
    }
 
 }
@@ -249,30 +216,8 @@
    val ErrorResponse = Value(0x50)
 }
 
-object ResponseResolver {
-   import OperationResponse._
-   private val responses = Map[Enumeration#Value, OperationResponse](
-      PutRequest -> PutResponse,
-      GetRequest -> GetResponse,
-      PutIfAbsentRequest -> PutIfAbsentResponse,
-      ReplaceRequest -> ReplaceResponse,
-      ReplaceIfUnmodifiedRequest -> ReplaceIfUnmodifiedResponse,
-      RemoveRequest -> RemoveResponse,
-      RemoveIfUnmodifiedRequest -> RemoveIfUnmodifiedResponse,
-      ContainsKeyRequest -> ContainsKeyResponse,
-      GetWithVersionRequest -> GetWithVersionResponse,
-      ClearRequest -> ClearResponse,
-      StatsRequest -> StatsResponse,
-      PingRequest -> PingResponse
-   )
-
-   def toResponse(request: Enumeration#Value): OperationResponse = {
-      responses.get(request).get
-   }
-}
-
 object ProtocolFlag extends Enumeration {
    type ProtocolFlag = Enumeration#Value
-   val NoFlag = Value(0)
-   val ForceReturnPreviousValue = Value(1)
+   val NoFlag = Value
+   val ForceReturnPreviousValue = Value
 }
\ No newline at end of file

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -10,7 +10,6 @@
 import org.infinispan.server.hotrod.OperationResponse._
 import java.nio.channels.ClosedChannelException
 import org.infinispan.{CacheException, Cache}
-import collection.mutable.HashMap
 
 /**
  * // TODO: Document this
@@ -27,7 +26,6 @@
    private var isError = false
    private var joined = false
    private val isTrace = isTraceEnabled
-   private var decoders = new HashMap[Short, AbstractVersionedDecoder]()
 
    override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
       try {
@@ -51,7 +49,10 @@
       
       try {
          val version = buffer.readUnsignedByte
-         val decoder = getDecoder(version)
+         val decoder = version match {
+            case Version10 => Decoder10
+            case _ => throw new UnknownVersionException("Unknown version:" + version)
+         }
          val header = decoder.readHeader(buffer, messageId)
          if (isTrace) trace("Decoded header {0}", header)
          isError = false
@@ -64,20 +65,6 @@
       }
    }
 
-   private def getDecoder(version: Short): AbstractVersionedDecoder = {
-      var option = decoders.get(version)
-      if (option == None) {
-         val decoder = version match {
-            case Version10 => new Decoder10(cacheManager)
-            case _ => throw new UnknownVersionException("Unknown version:" + version)
-         }
-         decoders += (version -> decoder)
-         decoder
-      } else {
-         option.get
-      }
-   }
-
    override def getCache(header: HotRodHeader): Cache[CacheKey, CacheValue] = {
       val cacheName = header.cacheName
       if (cacheName == TopologyCacheName)
@@ -126,14 +113,14 @@
          case se: ServerException => {
             val h = se.header.asInstanceOf[HotRodHeader]
             se.getCause match {
-               case i: InvalidMagicIdException => new ErrorResponse(0, "", 1, InvalidMagicOrMsgId, None, i.toString)
-               case u: UnknownOperationException => new ErrorResponse(h.messageId, "", 1, UnknownOperation, None, u.toString)
-               case u: UnknownVersionException => new ErrorResponse(h.messageId, "", 1, UnknownVersion, None, u.toString)
+               case i: InvalidMagicIdException => new ErrorResponse(0, "", 1, InvalidMagicOrMsgId, 0, i.toString)
+               case u: UnknownOperationException => new ErrorResponse(h.messageId, "", 1, UnknownOperation, 0, u.toString)
+               case u: UnknownVersionException => new ErrorResponse(h.messageId, "", 1, UnknownVersion, 0, u.toString)
                case t: Throwable => h.decoder.createErrorResponse(h, t)
             }
          }
          case c: ClosedChannelException => null
-         case t: Throwable => new ErrorResponse(0, "", 1, ServerError, None, t.toString)
+         case t: Throwable => new ErrorResponse(0, "", 1, ServerError, 0, t.toString)
       }
    }
 

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -4,9 +4,9 @@
 import org.infinispan.server.core.transport.{ChannelBuffer, ChannelHandlerContext, Channel, Encoder}
 import OperationStatus._
 import org.infinispan.server.core.transport.ChannelBuffers._
-import org.infinispan.manager.CacheManager
 import org.infinispan.Cache
 import collection.mutable.ListBuffer
+import org.infinispan.manager.EmbeddedCacheManager
 
 /**
  * // TODO: Document this
@@ -14,17 +14,20 @@
  * @since
  */
 
-class HotRodEncoder(cacheManager: CacheManager) extends Encoder {
+class HotRodEncoder(cacheManager: EmbeddedCacheManager) extends Encoder {
    import HotRodEncoder._
    import HotRodServer._
-   private lazy val topologyCache: Cache[String, TopologyView] = cacheManager.getCache(TopologyCacheName)   
 
+   private lazy val isClustered: Boolean = cacheManager.getGlobalConfiguration.getTransportClass != null
+   private lazy val topologyCache: Cache[String, TopologyView] =
+      if (isClustered) cacheManager.getCache(TopologyCacheName) else null
+
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
       val isTrace = isTraceEnabled
 
       if (isTrace) trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer = msg match { 
-         case r: Response => writeHeader(r, isTrace)
+         case r: Response => writeHeader(r, isTrace, getTopologyResponse(r))
       }
       msg match {
          case r: ResponseWithPrevious => {
@@ -53,15 +56,39 @@
       buffer
    }
 
-   private def writeHeader(r: Response, isTrace: Boolean): ChannelBuffer = {
+   private def getTopologyResponse(r: Response): AbstractTopologyResponse = {
+      // If clustered, set up a cache for topology information
+      if (isClustered) {
+         r.clientIntel match {
+            case 2 | 3 => {
+               val currentTopologyView = topologyCache.get("view")
+               if (r.topologyId != currentTopologyView.topologyId) {
+                  val cache = cacheManager.getCache(r.cacheName)
+                  val config = cache.getConfiguration
+                  if (r.clientIntel == 2 || !config.getCacheMode.isDistributed) {
+                     TopologyAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members))
+                  } else { // Must be 3 and distributed
+                     // TODO: Retrieve hash function when we have specified functions
+                     val hashSpace = cache.getAdvancedCache.getDistributionManager.getConsistentHash.getHashSpace
+                     HashDistAwareResponse(TopologyView(currentTopologyView.topologyId, currentTopologyView.members),
+                           config.getNumOwners, 1, hashSpace)
+                  }
+               } else null
+            }
+            case 1 => null
+         }
+      } else null
+   }
+
+   private def writeHeader(r: Response, isTrace: Boolean, topologyResp: AbstractTopologyResponse): ChannelBuffer = {
       val buffer = dynamicBuffer
       buffer.writeByte(Magic.byteValue)
       buffer.writeUnsignedLong(r.messageId)
       buffer.writeByte(r.operation.id.byteValue)
       buffer.writeByte(r.status.id.byteValue)
-      if (r.topologyResponse != None) {
+      if (topologyResp != null) {
          buffer.writeByte(1) // Topology changed
-         r.topologyResponse.get match {
+         topologyResp match {
             case t: TopologyAwareResponse => {
                if (r.clientIntel == 2)
                   writeTopologyHeader(t, buffer, isTrace)

Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -10,7 +10,7 @@
  * @since 4.1
  */
 class Response(val messageId: Long, val cacheName: String, val clientIntel: Short, val operation: OperationResponse,
-               val status: OperationStatus, val topologyResponse: Option[AbstractTopologyResponse]) {
+               val status: OperationStatus, val topologyId: Int) {
    override def toString = {
       new StringBuilder().append("Response").append("{")
          .append("messageId=").append(messageId)
@@ -23,9 +23,9 @@
 class ResponseWithPrevious(override val messageId: Long, override val cacheName: String,
                            override val clientIntel: Short, override val operation: OperationResponse,
                            override val status: OperationStatus,
-                           override val topologyResponse: Option[AbstractTopologyResponse],
+                           override val topologyId: Int,
                            val previous: Option[Array[Byte]])
-      extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyId) {
    override def toString = {
       new StringBuilder().append("ResponseWithPrevious").append("{")
          .append("messageId=").append(messageId)
@@ -38,9 +38,9 @@
 
 class GetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
                   override val operation: OperationResponse, override val status: OperationStatus,
-                  override val topologyResponse: Option[AbstractTopologyResponse],
+                  override val topologyId: Int,
                   val data: Option[Array[Byte]])
-      extends Response(messageId, cacheName, clientIntel, operation, status, topologyResponse) {
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyId) {
    override def toString = {
       new StringBuilder().append("GetResponse").append("{")
          .append("messageId=").append(messageId)
@@ -54,9 +54,9 @@
 class GetWithVersionResponse(override val messageId: Long, override val cacheName: String,
                              override val clientIntel: Short, override val operation: OperationResponse,
                              override val status: OperationStatus,
-                             override val topologyResponse: Option[AbstractTopologyResponse],
+                             override val topologyId: Int,
                              override val data: Option[Array[Byte]], val version: Long)
-      extends GetResponse(messageId, cacheName, clientIntel, operation, status, topologyResponse, data) {
+      extends GetResponse(messageId, cacheName, clientIntel, operation, status, topologyId, data) {
    override def toString = {
       new StringBuilder().append("GetWithVersionResponse").append("{")
          .append("messageId=").append(messageId)
@@ -70,8 +70,8 @@
 
 class ErrorResponse(override val messageId: Long, override val cacheName: String,
                     override val clientIntel: Short, override val status: OperationStatus,
-                    override val topologyResponse: Option[AbstractTopologyResponse], val msg: String)
-      extends Response(messageId, cacheName, clientIntel, ErrorResponse, status, topologyResponse) {
+                    override val topologyId: Int, val msg: String)
+      extends Response(messageId, cacheName, clientIntel, ErrorResponse, status, topologyId) {
    override def toString = {
       new StringBuilder().append("ErrorResponse").append("{")
          .append("messageId=").append(messageId)
@@ -84,8 +84,8 @@
 
 class StatsResponse(override val messageId: Long, override val cacheName: String,
                     override val clientIntel: Short, val stats: Map[String, String],
-                    override val topologyResponse: Option[AbstractTopologyResponse])
-      extends Response(messageId, cacheName, clientIntel, StatsResponse, Success, topologyResponse) {
+                    override val topologyId: Int)
+      extends Response(messageId, cacheName, clientIntel, StatsResponse, Success, topologyId) {
    override def toString = {
       new StringBuilder().append("StatsResponse").append("{")
          .append("messageId=").append(messageId)

Modified: branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -8,6 +8,7 @@
 import org.infinispan.manager.DefaultCacheManager
 import org.infinispan.server.core.CacheValue
 import org.infinispan.server.hotrod.OperationStatus._
+import org.infinispan.server.hotrod.test._
 
 /**
  * Hot Rod server functional test.
@@ -55,14 +56,14 @@
    }
 
    def testPutOnUndefinedCache(m: Method) {
-      var resp = client.execute(0xA0, 0x01, "boomooo", k(m), 0, 0, v(m), 0, 1, 0).asInstanceOf[ErrorResponse]
+      var resp = client.execute(0xA0, 0x01, "boomooo", k(m), 0, 0, v(m), 0, 1, 0).asInstanceOf[TestErrorResponse]
       assertTrue(resp.msg.contains("CacheNotFoundException"))
       assertEquals(resp.status, ServerError, "Status should have been 'ServerError' but instead was: " + resp.status)
       client.assertPut(m)
    }
 
    def testPutOnTopologyCache(m: Method) {
-      val resp = client.execute(0xA0, 0x01, TopologyCacheName, k(m), 0, 0, v(m), 0, 1, 0).asInstanceOf[ErrorResponse]
+      val resp = client.execute(0xA0, 0x01, TopologyCacheName, k(m), 0, 0, v(m), 0, 1, 0).asInstanceOf[TestErrorResponse]
       assertTrue(resp.msg.contains("Remote requests are not allowed to topology cache."))
       assertEquals(resp.status, ServerError, "Status should have been 'ServerError' but instead was: " + resp.status)
       client.assertPut(m)
@@ -81,10 +82,10 @@
    }
 
    def testPutWithPreviousValue(m: Method) {
-      var resp = client.put(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.put(k(m) , 0, 0, v(m), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, Success)
       assertEquals(resp.previous, None)
-      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[TestResponseWithPrevious]
       assertSuccess(resp, v(m))
    }
 
@@ -123,10 +124,10 @@
    }
 
    def testPutIfAbsentWithPreviousValue(m: Method) {
-      var resp = client.putIfAbsent(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.putIfAbsent(k(m) , 0, 0, v(m), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, Success)
       assertEquals(resp.previous, None)
-      resp = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, OperationNotExecuted)
       assertTrue(Arrays.equals(v(m), resp.previous.get))
    }
@@ -160,13 +161,13 @@
    }
 
    def testReplaceWithPreviousValue(m: Method) {
-      var resp = client.replace(k(m) , 0, 0, v(m), 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.replace(k(m) , 0, 0, v(m), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, OperationNotExecuted)
       assertEquals(resp.previous, None)
-      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.put(k(m) , 0, 0, v(m, "v2-"), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, Success)
       assertEquals(resp.previous, None)
-      resp = client.replace(k(m) , 0, 0, v(m, "v3-"), 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.replace(k(m) , 0, 0, v(m, "v3-"), 1).asInstanceOf[TestResponseWithPrevious]
       assertSuccess(resp, v(m, "v2-"))
    }
 
@@ -213,16 +214,16 @@
    }
 
    def testReplaceIfUnmodifiedWithPreviousValue(m: Method) {
-      var resp = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, KeyDoesNotExist)
       assertEquals(resp.previous, None)
       client.assertPut(m)
       val getResp = client.getWithVersion(k(m), 0)
       assertSuccess(getResp, v(m), 0)
-      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, OperationNotExecuted)
       assertTrue(Arrays.equals(v(m), resp.previous.get))
-      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+      resp  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, Success)
       assertTrue(Arrays.equals(v(m), resp.previous.get))
    }
@@ -240,11 +241,11 @@
    }
 
    def testRemoveWithPreviousValue(m: Method) {
-      var resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.remove(k(m), 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, KeyDoesNotExist)
       assertEquals(resp.previous, None)
       client.assertPut(m)
-      resp = client.remove(k(m), 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.remove(k(m), 1).asInstanceOf[TestResponseWithPrevious]
       assertSuccess(resp, v(m))
    }
 
@@ -283,16 +284,16 @@
    }
 
    def testRemoveIfUmodifiedWithPreviousValue(m: Method) {
-      var resp = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[ResponseWithPrevious]
+      var resp = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, KeyDoesNotExist)
       assertEquals(resp.previous, None)
       client.assertPut(m)
       val getResp = client.getWithVersion(k(m), 0)
       assertSuccess(getResp, v(m), 0)
-      resp  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[ResponseWithPrevious]
+      resp  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, OperationNotExecuted)
       assertTrue(Arrays.equals(v(m), resp.previous.get))
-      resp = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[ResponseWithPrevious]
+      resp = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), getResp.version, 1).asInstanceOf[TestResponseWithPrevious]
       assertStatus(resp.status, Success)
       assertTrue(Arrays.equals(v(m), resp.previous.get))
    }

Modified: branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -53,10 +53,10 @@
    
    def stop = ch.disconnect
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
       execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], clientIntelligence: Byte, topologyId: Int): Response =
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], clientIntelligence: Byte, topologyId: Int): TestResponse =
       execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, clientIntelligence, topologyId)
 
    def assertPut(m: Method) {
@@ -82,64 +82,64 @@
       assertStatus(status, Success)
    }
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
       execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
 
-   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
       execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
-   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
       execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
    
-   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Response =
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
       execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
-   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): Response =
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
       execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)   
 
-   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): TestResponse =
       execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
 
-   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): TestResponse =
       execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
 
-   def remove(k: Array[Byte]): Response =
+   def remove(k: Array[Byte]): TestResponse =
       execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 1 ,0)
 
-   def remove(k: Array[Byte], flags: Int): Response =
+   def remove(k: Array[Byte], flags: Int): TestResponse =
       execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, flags)
 
-   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): Response =
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): TestResponse =
       execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
 
-   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): Response =
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): TestResponse =
       execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
 
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-               v: Array[Byte], version: Long, clientIntelligence: Byte, topologyId: Int): Response = {
+               v: Array[Byte], version: Long, clientIntelligence: Byte, topologyId: Int): TestResponse = {
       val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, clientIntelligence, topologyId)
       execute(op, op.id)
    }
 
    def executeExpectBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-                           v: Array[Byte], version: Long): ErrorResponse = {
+                           v: Array[Byte], version: Long): TestErrorResponse = {
       val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version, 1, 0)
-      execute(op, 0).asInstanceOf[ErrorResponse]
+      execute(op, 0).asInstanceOf[TestErrorResponse]
    }
 
    def executePartial(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-                      v: Array[Byte], version: Long): ErrorResponse = {
+                      v: Array[Byte], version: Long): TestErrorResponse = {
       val op = new PartialOp(magic, code, name, k, lifespan, maxIdle, v, 0, version, 1, 0)
-      execute(op, op.id).asInstanceOf[ErrorResponse]
+      execute(op, op.id).asInstanceOf[TestErrorResponse]
    }
 
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-               v: Array[Byte], version: Long, flags: Int): Response = {
+               v: Array[Byte], version: Long, flags: Int): TestResponse = {
       val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version, 1, 0)
       execute(op, op.id)
    }
 
-   private def execute(op: Op, expectedResponseMessageId: Long): Response = {
+   private def execute(op: Op, expectedResponseMessageId: Long): TestResponse = {
       writeOp(op)
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       handler.getResponse(expectedResponseMessageId)
@@ -152,22 +152,22 @@
       assertTrue(future.isSuccess)
    }
 
-   def get(k: Array[Byte], flags: Int): GetResponse = {
-      get(0x03, k, 0).asInstanceOf[GetResponse]
+   def get(k: Array[Byte], flags: Int): TestGetResponse = {
+      get(0x03, k, 0).asInstanceOf[TestGetResponse]
    }
 
-   def assertGet(m: Method): GetResponse = assertGet(m, 0)
+   def assertGet(m: Method): TestGetResponse = assertGet(m, 0)
 
-   def assertGet(m: Method, flags: Int): GetResponse = get(k(m), flags)
+   def assertGet(m: Method, flags: Int): TestGetResponse = get(k(m), flags)
 
-   def containsKey(k: Array[Byte], flags: Int): Response = {
+   def containsKey(k: Array[Byte], flags: Int): TestResponse = {
       get(0x0F, k, 0)
    }
 
-   def getWithVersion(k: Array[Byte], flags: Int): GetWithVersionResponse =
-      get(0x11, k, 0).asInstanceOf[GetWithVersionResponse]
+   def getWithVersion(k: Array[Byte], flags: Int): TestGetWithVersionResponse =
+      get(0x11, k, 0).asInstanceOf[TestGetWithVersionResponse]
 
-   private def get(code: Byte, k: Array[Byte], flags: Int): Response = {
+   private def get(code: Byte, k: Array[Byte], flags: Int): TestResponse = {
       val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0, 1, 0)
       val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
@@ -179,20 +179,20 @@
       }
    }
 
-   def clear: Response = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
+   def clear: TestResponse = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
 
    def stats: Map[String, String] = {
       val op = new StatsOp(0xA0, 0x15, defaultCacheName, 1, 0, null)
       val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      val resp = handler.getResponse(op.id).asInstanceOf[StatsResponse]
+      val resp = handler.getResponse(op.id).asInstanceOf[TestStatsResponse]
       resp.stats
    }
 
-   def ping: Response = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
+   def ping: TestResponse = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 1 ,0)
 
-   def ping(clientIntelligence: Byte, topologyId: Int): Response =
+   def ping(clientIntelligence: Byte, topologyId: Int): TestResponse =
       execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, clientIntelligence, topologyId)
 
 }
@@ -307,50 +307,49 @@
             for (i <- 1 to size) {
                stats += (buf.readString -> buf.readString)
             }
-            new StatsResponse(id, op.cacheName, op.clientIntel, immutable.Map[String, String]() ++ stats, 
-               topologyChangeResponse)
+            new TestStatsResponse(id, op.cacheName, op.clientIntel, immutable.Map[String, String]() ++ stats, op.topologyId, topologyChangeResponse)
          }
          case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
               | RemoveResponse | RemoveIfUnmodifiedResponse => {
             if (op.flags == 1) {
                val length = buf.readUnsignedInt
                if (length == 0) {
-                  new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
-                     topologyChangeResponse, None)
+                  new TestResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+                     op.topologyId, None, topologyChangeResponse)
                } else {
                   val previous = new Array[Byte](length)
                   buf.readBytes(previous)
-                  new ResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
-                     topologyChangeResponse, Some(previous))
+                  new TestResponseWithPrevious(id, op.cacheName, op.clientIntel, opCode, status,
+                     op.topologyId, Some(previous), topologyChangeResponse)
                }
-            } else new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
+            } else new TestResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, topologyChangeResponse)
          }
          case ContainsKeyResponse | ClearResponse | PingResponse =>
-            new Response(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse)
+            new TestResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, topologyChangeResponse)
          case GetWithVersionResponse  => {
             if (status == Success) {
                val version = buf.readLong
                val data = Some(buf.readRangedBytes)
-               new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
-                  topologyChangeResponse, data, version)
+               new TestGetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+                  op.topologyId, data, version, topologyChangeResponse)
             } else{
-               new GetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
-                  topologyChangeResponse, None, 0)
+               new TestGetWithVersionResponse(id, op.cacheName, op.clientIntel, opCode, status,
+                  op.topologyId, None, 0, topologyChangeResponse)
             }
          }
          case GetResponse => {
             if (status == Success) {
                val data = Some(buf.readRangedBytes)
-               new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, data)
+               new TestGetResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, data, topologyChangeResponse)
             } else{
-               new GetResponse(id, op.cacheName, op.clientIntel, opCode, status, topologyChangeResponse, None)
+               new TestGetResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, None, topologyChangeResponse)
             }
          }
          case ErrorResponse => {
             if (op == null)
-               new ErrorResponse(id, "", 0, status, topologyChangeResponse, buf.readString)
+               new TestErrorResponse(id, "", 0, status, 0, buf.readString, topologyChangeResponse)
             else
-               new ErrorResponse(id, op.cacheName, op.clientIntel, status, topologyChangeResponse, buf.readString)
+               new TestErrorResponse(id, op.cacheName, op.clientIntel, status, op.topologyId, buf.readString, topologyChangeResponse)
          }
 
       }
@@ -365,18 +364,18 @@
 
 private class ClientHandler(rspTimeoutSeconds: Int) extends SimpleChannelUpstreamHandler {
 
-   private val responses = new ConcurrentHashMap[Long, Response]
+   private val responses = new ConcurrentHashMap[Long, TestResponse]
 
    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
-      val resp = e.getMessage.asInstanceOf[Response]
+      val resp = e.getMessage.asInstanceOf[TestResponse]
       trace("Put {0} in responses", resp)
       responses.put(resp.messageId, resp)
    }
 
-   def getResponse(messageId: Long): Response = {
+   def getResponse(messageId: Long): TestResponse = {
       // TODO: Very very primitive way of waiting for a response. Convert to a Future
       var i = 0;
-      var v: Response = null;
+      var v: TestResponse = null;
       do {
          v = responses.get(messageId)
          if (v == null) {
@@ -440,4 +439,43 @@
               override val cacheName: String,
               override val clientIntel: Byte,
               override val topologyId: Int,
-              val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
\ No newline at end of file
+              val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
+
+class TestResponse(override val messageId: Long, override val cacheName: String,
+                   override val clientIntel: Short, override val operation: OperationResponse,
+                   override val status: OperationStatus,
+                   override val topologyId: Int,
+                   val topologyResponse: Option[AbstractTopologyResponse])
+      extends Response(messageId, cacheName, clientIntel, operation, status, topologyId)
+
+class TestResponseWithPrevious(override val messageId: Long, override val cacheName: String,
+                           override val clientIntel: Short, override val operation: OperationResponse,
+                           override val status: OperationStatus,
+                           override val topologyId: Int, val previous: Option[Array[Byte]],
+                           override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestResponse(messageId, cacheName, clientIntel, operation, status, topologyId, topologyResponse)
+
+class TestGetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
+                  override val operation: OperationResponse, override val status: OperationStatus,
+                  override val topologyId: Int, val data: Option[Array[Byte]],
+                  override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestResponse(messageId, cacheName, clientIntel, operation, status, topologyId, topologyResponse)
+
+class TestGetWithVersionResponse(override val messageId: Long, override val cacheName: String,
+                             override val clientIntel: Short, override val operation: OperationResponse,
+                             override val status: OperationStatus,
+                             override val topologyId: Int,
+                             override val data: Option[Array[Byte]], val version: Long,
+                             override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestGetResponse(messageId, cacheName, clientIntel, operation, status, topologyId, data, topologyResponse)
+
+class TestErrorResponse(override val messageId: Long, override val cacheName: String,
+                    override val clientIntel: Short, override val status: OperationStatus,
+                    override val topologyId: Int, val msg: String,
+                    override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestResponse(messageId, cacheName, clientIntel, ErrorResponse, status, topologyId, topologyResponse)
+
+class TestStatsResponse(override val messageId: Long, override val cacheName: String,
+                        override val clientIntel: Short, val stats: Map[String, String],
+                        override val topologyId: Int, override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestResponse(messageId, cacheName, clientIntel, StatsResponse, Success, topologyId, topologyResponse)
\ No newline at end of file

Modified: branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-03 07:59:24 UTC (rev 1889)
+++ branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-06-03 15:02:20 UTC (rev 1890)
@@ -86,7 +86,7 @@
       isSuccess
    }
 
-   def assertSuccess(resp: GetResponse, expected: Array[Byte]): Boolean = {
+   def assertSuccess(resp: TestGetResponse, expected: Array[Byte]): Boolean = {
       assertStatus(resp.status, Success)
       val isArrayEquals = Arrays.equals(expected, resp.data.get)
       assertTrue(isArrayEquals, "Retrieved data should have contained " + Util.printArray(expected, true)
@@ -94,19 +94,19 @@
       isArrayEquals
    }
 
-   def assertSuccess(resp: GetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
+   def assertSuccess(resp: TestGetWithVersionResponse, expected: Array[Byte], expectedVersion: Int): Boolean = {
       assertTrue(resp.version != expectedVersion)
       assertSuccess(resp, expected)
    }
 
-   def assertSuccess(resp: ResponseWithPrevious, expected: Array[Byte]): Boolean = {
+   def assertSuccess(resp: TestResponseWithPrevious, expected: Array[Byte]): Boolean = {
       assertStatus(resp.status, Success)
       val isSuccess = Arrays.equals(expected, resp.previous.get)
       assertTrue(isSuccess)
       isSuccess
    }
 
-   def assertKeyDoesNotExist(resp: GetResponse): Boolean = {
+   def assertKeyDoesNotExist(resp: TestGetResponse): Boolean = {
       val status = resp.status
       assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
       assertEquals(resp.data, None)



More information about the infinispan-commits mailing list