[infinispan-commits] Infinispan SVN: r1595 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Mar 11 03:55:30 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-11 03:55:29 -0500 (Thu, 11 Mar 2010)
New Revision: 1595

Modified:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Made Key/Value Externalizable and added first cluster test. Enhanced code to deal with errors better. Tests are still failing but will sort them out asap.  


Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -24,16 +24,16 @@
          case (x, 0) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
          case (x, y) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
       }
-      new Response(OpCodes.PutResponse, c.id, Status.Success)
+      new Response(c.id, OpCodes.PutResponse, Status.Success)
    }
 
    override def get(c: RetrievalCommand): Response = {
       val cache = getCache(c.cacheName, c.flags)
       val value = cache.get(new Key(c.key))
       if (value != null)
-         new RetrievalResponse(OpCodes.GetResponse, c.id, Status.Success, value.v)
+         new RetrievalResponse(c.id, OpCodes.GetResponse, Status.Success, value.v)
       else
-         new RetrievalResponse(OpCodes.GetResponse, c.id, Status.KeyDoesNotExist, null)
+         new RetrievalResponse(c.id, OpCodes.GetResponse, Status.KeyDoesNotExist, null)
    }
 
    private def getCache(cacheName: String, flags: Set[Flag]): InfinispanCache[Key, Value] = {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -9,13 +9,12 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class Decoder410 extends NoStateDecoder {
+class Decoder410 {
    import Decoder410._
 
-   override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Command = {
-      val op = OpCodes.apply(buffer.readUnsignedByte)
+   def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, id: Long): Command = {
+      val op = getOpCode(buffer)
       val cacheName = buffer.readString
-      val id = buffer.readUnsignedLong
       val flags = Flags.toContextFlags(buffer.readUnsignedInt)
       val command: Command =
          op match {                                   
@@ -39,10 +38,30 @@
       command
    }
 
-   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-      error("Error", e.getCause)
+   private def getOpCode(buffer: ChannelBuffer): OpCodes.OpCode = {
+      val op: Int = buffer.readUnsignedByte
+      try {
+         OpCodes.apply(op)
+      } catch {
+         case n: NoSuchElementException =>
+            throw new UnknownCommandException("Operation code not valid: 0x" + op.toHexString + " (" + op + ")")
+      }
    }
 
+//   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+//      // no-op, handled by parent decoder
+////      val t = e.getCause
+////      error("Error", t)
+////      ctx.sendDownstream(e)
+////      val ch = ctx.getChannel
+////      val buffers = ctx.getChannelBuffers
+////      t match {
+////         case u: UnknownCommandException =>
+////      }
+//
+//
+//   }
+
 }
 
 object Decoder410 extends Logging
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -15,25 +15,33 @@
 
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
       trace("Encode msg {0}", msg)
-      val buffer: ChannelBuffer =
+//      try {
+         val buffer: ChannelBuffer =
+            msg match {
+               case r: Response => {
+                  val buffer = ctx.getChannelBuffers.dynamicBuffer
+                  buffer.writeByte(Magic.byteValue)
+                  buffer.writeUnsignedLong(r.id)
+                  buffer.writeByte(r.opCode.id.byteValue)
+                  buffer.writeByte(r.status.id.byteValue)
+                  buffer
+               }
+         }
          msg match {
-            case r: Response => {
-               val buffer = ctx.getChannelBuffers.dynamicBuffer
-               buffer.writeByte(Magic.byteValue)
-               buffer.writeByte(r.opCode.id.byteValue)
-               buffer.writeUnsignedLong(r.id)
-               buffer.writeByte(r.status.id.byteValue)
-               buffer
+            case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+            case er: ErrorResponse => buffer.writeString(er.msg)
+            case _ => {
+               if (buffer == null)
+                  throw new IllegalArgumentException("Response received is unknown: " + msg);
             }
-      }
-      msg match {
-         case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
-         case _ => {
-            if (buffer == null)
-               throw new IllegalArgumentException("Response received is unknown: " + msg);
          }
-      }
-      buffer
+         buffer
+//      } catch {
+//         case t: Throwable => {
+//            val buffer = ctx.getChannelBuffers.dynamicBuffer
+//            buffer.writeByte(Magic.byteValue)
+//         }
+//      }
    }
    
 }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,7 +6,18 @@
  * @since 4.1
  */
 
-class ErrorResponse(override val opCode: OpCodes.OpCode,
-                    override val id: Long,
+class ErrorResponse(override val id: Long,
+                    override val opCode: OpCodes.OpCode,
                     override val status: Status.Status,
-                    val msg: String) extends Response(opCode, id, status)
\ No newline at end of file
+                    val msg: String) extends Response(id, opCode, status) {
+
+   override def toString = {
+      new StringBuilder().append("ErrorResponse").append("{")
+         .append("id=").append(id)
+         .append(", opCode=").append(opCode)
+         .append(", status=").append(status)
+         .append(", msg=").append(msg)
+         .append("}").toString
+   }
+   
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -2,6 +2,8 @@
 
 import java.io.StreamCorruptedException
 import org.infinispan.server.core.transport._
+import org.infinispan.server.core.UnknownCommandException
+import org.infinispan.util.concurrent.TimeoutException
 
 /**
  * // TODO: Document this
@@ -9,30 +11,62 @@
  * @since 4.1
  */
 class GlobalDecoder extends NoStateDecoder {
+
    import GlobalDecoder._
 
    private val Magic = 0xA0
    private val Version410 = 41
+   @volatile private var isError = false
 
    override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Object = {
       val magic = buffer.readUnsignedByte()
       if (magic != Magic) {
-         throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+         if (!isError) {
+            // throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+            val m = "Error reading magic byte or message id: " + magic
+            return new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m)
+         } else {
+            trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
+            return null // Keep trying to read until we find magic
+         }
       }
 
-      val version = buffer.readUnsignedByte()
-      val decoder =
-         version match {
-            case Version410 => new Decoder410
-            case _ => throw new StreamCorruptedException("Unknown version:" + version)
-         }
-      val command = decoder.decode(ctx, buffer)
-      trace("Decoded msg {0}", command)
-      command
+      val id = buffer.readUnsignedLong
+
+      try {
+         val version = buffer.readUnsignedByte()
+         val decoder =
+            version match {
+               case Version410 => new Decoder410
+               case _ => {
+                  isError = true
+                  return new ErrorResponse(id, OpCodes.ErrorResponse, Status.UnknownVersion, "Unknown version:" + version)
+               }
+            }
+         val command = decoder.decode(ctx, buffer, id)
+         trace("Decoded msg {0}", command)
+         isError = false
+         command
+      } catch {
+         case u: UnknownCommandException => createErrorResponse(id, Status.UnknownCommand, u)
+         case s: StreamCorruptedException => createErrorResponse(id, Status.ParseError, s)
+         case o: TimeoutException => createErrorResponse(id, Status.CommandTimedOut, o)
+         case t: Throwable => createErrorResponse(id, Status.ServerError, t)
+      }
    }
 
+   private def createErrorResponse(id: Long, status: Status.Status, t: Throwable): ErrorResponse = {
+      isError = true
+      error("Error processing command", t)
+      val m = if (t.getMessage != null) t.getMessage else ""
+      new ErrorResponse(id, OpCodes.ErrorResponse, status, m)
+   }
+
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-      error("Error", e.getCause)
+//      val t = e.getCause
+//      val m = if (t.getMessage != null) t.getMessage else "" 
+      error("Unexpected error", e.getCause)
+//      e.getChannel.write(new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m))
    }
 
 }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,7 +1,6 @@
 package org.infinispan.server.hotrod
 
-import org.infinispan.server.core.transport.ChannelHandlerContext
-import org.infinispan.server.core.{MessageEvent, CommandHandler}
+import org.infinispan.server.core.transport.{CommandHandler, MessageEvent, ChannelHandlerContext}
 
 /**
  * // TODO: Document this
@@ -15,6 +14,7 @@
       e.getMessage match {
 //         case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
          case c: Command => e.getChannel.write(c.perform(hotCache))
+         case er: ErrorResponse => e.getChannel.write(er) 
       }
 
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,15 +1,16 @@
 package org.infinispan.server.hotrod
 
 import java.util.Arrays
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
+final class Key(var k: Array[Byte]) extends Externalizable {
 
-final class Key(val k: Array[Byte]) {
-
    override def equals(obj: Any) = {
       obj match {
          case k: Key => Arrays.equals(k.k, this.k)
@@ -25,4 +26,13 @@
          .append("}").toString
    }
 
+   override def readExternal(in: ObjectInput) {
+      k = new Array[Byte](in.read())
+      in.read(k)
+   }
+
+   override def writeExternal(out: ObjectOutput) {
+      out.write(k.length)
+      out.write(k)
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -13,4 +13,6 @@
    val PutResponse = Value(0x02)
    val GetRequest = Value(0x03)
    val GetResponse = Value(0x04)
+
+   val ErrorResponse = Value(0x50)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,14 +6,14 @@
  * @since 4.1
  */
 
-class Response(val opCode: OpCodes.OpCode,
-               val id: Long,
+class Response(val id: Long,
+               val opCode: OpCodes.OpCode,
                val status: Status.Status) {
 
    override def toString = {
       new StringBuilder().append("Response").append("{")
-         .append("opCode=").append(opCode)
-         .append(", id=").append(id)
+         .append("id=").append(id)
+         .append(", opCode=").append(opCode)
          .append(", status=").append(status)
          .append("}").toString
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,10 +6,10 @@
  * @since
  */
 
-class RetrievalResponse(override val opCode: OpCodes.OpCode,
-                             override val id: Long,
-                             override val status: Status.Status,
-                             val value: Array[Byte]) extends Response(opCode, id, status) {
+class RetrievalResponse(override val id: Long,
+                        override val opCode: OpCodes.OpCode,
+                        override val status: Status.Status,
+                        val value: Array[Byte]) extends Response(id, opCode, status) {
 
    override def toString = {
       new StringBuilder().append("RetrievalResponse").append("{")

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -11,4 +11,12 @@
 
    val Success = Value(0x00)
    val KeyDoesNotExist = Value(0x02)
+
+   val InvalidMagicOrMsgId = Value(0x81)
+   val UnknownCommand = Value(0x82)
+   val UnknownVersion = Value(0x83) // todo: test
+   val ParseError = Value(0x84) // todo: test
+   val ServerError = Value(0x85) // todo: test
+   val CommandTimedOut = Value(0x86) // todo: test
+   
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,12 +1,15 @@
 package org.infinispan.server.hotrod
 
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
 
-final class Value(val v: Array[Byte]) {
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
+final class Value(var v: Array[Byte]) extends Externalizable {
 
    override def toString = {
       new StringBuilder().append("Value").append("{")
@@ -14,4 +17,13 @@
          .append("}").toString
    }
 
+   override def readExternal(in: ObjectInput) {
+      v = new Array[Byte](in.read())
+      in.read(v)
+   }
+
+   override def writeExternal(out: ObjectOutput) {
+      out.write(v.length)
+      out.write(v)
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -11,9 +11,7 @@
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
 import org.infinispan.context.Flag
 import org.infinispan.{AdvancedCache, Cache => InfinispanCache}
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
-import javax.transaction.TransactionManager
-import javax.transaction.{Status => TransactionStatus}
+import org.infinispan.test.{SingleCacheManagerTest}
 
 /**
  * TODO: Document
@@ -53,6 +51,17 @@
       server.stop
    }
 
+   def testUnknownCommand(m: Method) {
+      val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m))
+      assertSuccess(status)
+   }
+
+   def testUnknownMagic(m: Method) {
+      doPut(m) // Do a put to make sure decoder gets back to reading properly
+      val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m))
+      assertSuccess(status)
+   }
+
    def testPutBasic(m: Method) {
       doPut(m)
    }
@@ -89,6 +98,8 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
+   
+
 // Invalid test since starting transactions does not make sense
 // TODO: discuss flags with list
 // def testGetWithWriteLock(m: Method) {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -9,7 +9,6 @@
 import org.infinispan.server.core.transport.netty.NettyChannelBuffer
 import org.jboss.netty.handler.codec.replay.ReplayingDecoder
 import org.testng.Assert._
-import java.util.concurrent.{LinkedBlockingQueue, Executors}
 import org.infinispan.server.hotrod._
 import org.infinispan.server.hotrod.OpCodes._
 import org.infinispan.server.hotrod.Status._
@@ -18,6 +17,7 @@
 import org.jboss.netty.channel.ChannelHandler.Sharable
 import org.infinispan.context.Flag
 import java.util.Arrays
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
 
 /**
  * // TODO: Document this
@@ -47,7 +47,11 @@
    }
 
    def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
-      val writeFuture = ch.write(new Op(0x01, name, k, lifespan, maxIdle, v, null))
+      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v)
+   }
+
+   def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
+      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, null))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -55,12 +59,8 @@
       handler.getResponse.status
    }
 
-//   def get(ch: Channel, name: String, key: Array[Byte]): (Status.Status, Array[Byte]) = {
-//      get(ch, name, key, null)
-//   }
-
    def get(ch: Channel, name: String, k: Array[Byte], flags: Set[Flag]): (Status.Status, Array[Byte]) = {
-      val writeFuture = ch.write(new Op(0x03, name, k, 0, 0, null, flags))
+      val writeFuture = ch.write(new Op(0xA0, 0x03, name, k, 0, 0, null, flags))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -107,11 +107,11 @@
          msg match {
             case op: Op => {
                val buffer = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
-               buffer.writeByte(0xA0.asInstanceOf[Byte]) // magic
+               buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
+               buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
                buffer.writeByte(41) // version
                buffer.writeByte(op.code) // opcode
                buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
-               buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
                if (op.flags != null)
                   buffer.writeUnsignedInt(Flags.fromContextFlags(op.flags)) // flags
                else
@@ -136,12 +136,12 @@
    override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
       val buf = new NettyChannelBuffer(buffer)
       val magic = buf.readUnsignedByte
+      val id = buf.readUnsignedLong
       val opCode = OpCodes.apply(buf.readUnsignedByte)
-      val id = buf.readUnsignedLong
       val status = Status.apply(buf.readUnsignedByte)
       val resp: Response =
          opCode match {
-            case PutResponse => new Response(opCode, id, status)
+            case PutResponse => new Response(id, opCode, status)
             case GetResponse => {
                val value = {
                   status match {
@@ -149,8 +149,9 @@
                      case _ => null
                   }
                }
-               new RetrievalResponse(opCode, id, status, value)
+               new RetrievalResponse(id, opCode, status, value)
             }
+            case ErrorResponse => new ErrorResponse(id, opCode, status, buf.readString)
          }
       resp
    }
@@ -170,12 +171,13 @@
    }
 
    def getResponse: Response = {
-      answer.take
+      answer.poll(60, TimeUnit.SECONDS)
    }
 
 }
 
-private class Op(val code: Byte,
+private class Op(val magic: Int,
+                 val code: Byte,
                  val cacheName: String,
                  val key: Array[Byte],
                  val lifespan: Int,

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-11 08:55:29 UTC (rev 1595)
@@ -42,6 +42,6 @@
 object Utils extends Logging
 
 object UniquePortThreadLocal extends ThreadLocal[Int] {
-   private val uniqueAddr = new AtomicInteger(21212)   
+   private val uniqueAddr = new AtomicInteger(11311)   
    override def initialValue: Int = uniqueAddr.getAndAdd(100)
 }
\ No newline at end of file



More information about the infinispan-commits mailing list