[infinispan-commits] Infinispan SVN: r1595 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Mar 11 03:55:30 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-11 03:55:29 -0500 (Thu, 11 Mar 2010)
New Revision: 1595
Modified:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Made Key/Value Externalizable and added first cluster test. Enhanced code to deal with errors better. Tests are still failing but will sort them out asap.
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -24,16 +24,16 @@
case (x, 0) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
case (x, y) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
}
- new Response(OpCodes.PutResponse, c.id, Status.Success)
+ new Response(c.id, OpCodes.PutResponse, Status.Success)
}
override def get(c: RetrievalCommand): Response = {
val cache = getCache(c.cacheName, c.flags)
val value = cache.get(new Key(c.key))
if (value != null)
- new RetrievalResponse(OpCodes.GetResponse, c.id, Status.Success, value.v)
+ new RetrievalResponse(c.id, OpCodes.GetResponse, Status.Success, value.v)
else
- new RetrievalResponse(OpCodes.GetResponse, c.id, Status.KeyDoesNotExist, null)
+ new RetrievalResponse(c.id, OpCodes.GetResponse, Status.KeyDoesNotExist, null)
}
private def getCache(cacheName: String, flags: Set[Flag]): InfinispanCache[Key, Value] = {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -9,13 +9,12 @@
* @author Galder Zamarreño
* @since 4.1
*/
-class Decoder410 extends NoStateDecoder {
+class Decoder410 {
import Decoder410._
- override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Command = {
- val op = OpCodes.apply(buffer.readUnsignedByte)
+ def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, id: Long): Command = {
+ val op = getOpCode(buffer)
val cacheName = buffer.readString
- val id = buffer.readUnsignedLong
val flags = Flags.toContextFlags(buffer.readUnsignedInt)
val command: Command =
op match {
@@ -39,10 +38,30 @@
command
}
- override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- error("Error", e.getCause)
+ private def getOpCode(buffer: ChannelBuffer): OpCodes.OpCode = {
+ val op: Int = buffer.readUnsignedByte
+ try {
+ OpCodes.apply(op)
+ } catch {
+ case n: NoSuchElementException =>
+ throw new UnknownCommandException("Operation code not valid: 0x" + op.toHexString + " (" + op + ")")
+ }
}
+// override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+// // no-op, handled by parent decoder
+//// val t = e.getCause
+//// error("Error", t)
+//// ctx.sendDownstream(e)
+//// val ch = ctx.getChannel
+//// val buffers = ctx.getChannelBuffers
+//// t match {
+//// case u: UnknownCommandException =>
+//// }
+//
+//
+// }
+
}
object Decoder410 extends Logging
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -15,25 +15,33 @@
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
trace("Encode msg {0}", msg)
- val buffer: ChannelBuffer =
+// try {
+ val buffer: ChannelBuffer =
+ msg match {
+ case r: Response => {
+ val buffer = ctx.getChannelBuffers.dynamicBuffer
+ buffer.writeByte(Magic.byteValue)
+ buffer.writeUnsignedLong(r.id)
+ buffer.writeByte(r.opCode.id.byteValue)
+ buffer.writeByte(r.status.id.byteValue)
+ buffer
+ }
+ }
msg match {
- case r: Response => {
- val buffer = ctx.getChannelBuffers.dynamicBuffer
- buffer.writeByte(Magic.byteValue)
- buffer.writeByte(r.opCode.id.byteValue)
- buffer.writeUnsignedLong(r.id)
- buffer.writeByte(r.status.id.byteValue)
- buffer
+ case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+ case er: ErrorResponse => buffer.writeString(er.msg)
+ case _ => {
+ if (buffer == null)
+ throw new IllegalArgumentException("Response received is unknown: " + msg);
}
- }
- msg match {
- case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
- case _ => {
- if (buffer == null)
- throw new IllegalArgumentException("Response received is unknown: " + msg);
}
- }
- buffer
+ buffer
+// } catch {
+// case t: Throwable => {
+// val buffer = ctx.getChannelBuffers.dynamicBuffer
+// buffer.writeByte(Magic.byteValue)
+// }
+// }
}
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ErrorResponse.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,7 +6,18 @@
* @since 4.1
*/
-class ErrorResponse(override val opCode: OpCodes.OpCode,
- override val id: Long,
+class ErrorResponse(override val id: Long,
+ override val opCode: OpCodes.OpCode,
override val status: Status.Status,
- val msg: String) extends Response(opCode, id, status)
\ No newline at end of file
+ val msg: String) extends Response(id, opCode, status) {
+
+ override def toString = {
+ new StringBuilder().append("ErrorResponse").append("{")
+ .append("id=").append(id)
+ .append(", opCode=").append(opCode)
+ .append(", status=").append(status)
+ .append(", msg=").append(msg)
+ .append("}").toString
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -2,6 +2,8 @@
import java.io.StreamCorruptedException
import org.infinispan.server.core.transport._
+import org.infinispan.server.core.UnknownCommandException
+import org.infinispan.util.concurrent.TimeoutException
/**
* // TODO: Document this
@@ -9,30 +11,62 @@
* @since 4.1
*/
class GlobalDecoder extends NoStateDecoder {
+
import GlobalDecoder._
private val Magic = 0xA0
private val Version410 = 41
+ @volatile private var isError = false
override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): Object = {
val magic = buffer.readUnsignedByte()
if (magic != Magic) {
- throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+ if (!isError) {
+ // throw new StreamCorruptedException("Magic byte incorrect: " + magic)
+ val m = "Error reading magic byte or message id: " + magic
+ return new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m)
+ } else {
+ trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
+ return null // Keep trying to read until we find magic
+ }
}
- val version = buffer.readUnsignedByte()
- val decoder =
- version match {
- case Version410 => new Decoder410
- case _ => throw new StreamCorruptedException("Unknown version:" + version)
- }
- val command = decoder.decode(ctx, buffer)
- trace("Decoded msg {0}", command)
- command
+ val id = buffer.readUnsignedLong
+
+ try {
+ val version = buffer.readUnsignedByte()
+ val decoder =
+ version match {
+ case Version410 => new Decoder410
+ case _ => {
+ isError = true
+ return new ErrorResponse(id, OpCodes.ErrorResponse, Status.UnknownVersion, "Unknown version:" + version)
+ }
+ }
+ val command = decoder.decode(ctx, buffer, id)
+ trace("Decoded msg {0}", command)
+ isError = false
+ command
+ } catch {
+ case u: UnknownCommandException => createErrorResponse(id, Status.UnknownCommand, u)
+ case s: StreamCorruptedException => createErrorResponse(id, Status.ParseError, s)
+ case o: TimeoutException => createErrorResponse(id, Status.CommandTimedOut, o)
+ case t: Throwable => createErrorResponse(id, Status.ServerError, t)
+ }
}
+ private def createErrorResponse(id: Long, status: Status.Status, t: Throwable): ErrorResponse = {
+ isError = true
+ error("Error processing command", t)
+ val m = if (t.getMessage != null) t.getMessage else ""
+ new ErrorResponse(id, OpCodes.ErrorResponse, status, m)
+ }
+
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
- error("Error", e.getCause)
+// val t = e.getCause
+// val m = if (t.getMessage != null) t.getMessage else ""
+ error("Unexpected error", e.getCause)
+// e.getChannel.write(new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m))
}
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,7 +1,6 @@
package org.infinispan.server.hotrod
-import org.infinispan.server.core.transport.ChannelHandlerContext
-import org.infinispan.server.core.{MessageEvent, CommandHandler}
+import org.infinispan.server.core.transport.{CommandHandler, MessageEvent, ChannelHandlerContext}
/**
* // TODO: Document this
@@ -15,6 +14,7 @@
e.getMessage match {
// case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
case c: Command => e.getChannel.write(c.perform(hotCache))
+ case er: ErrorResponse => e.getChannel.write(er)
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Key.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,15 +1,16 @@
package org.infinispan.server.hotrod
import java.util.Arrays
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since
*/
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
+final class Key(var k: Array[Byte]) extends Externalizable {
-final class Key(val k: Array[Byte]) {
-
override def equals(obj: Any) = {
obj match {
case k: Key => Arrays.equals(k.k, this.k)
@@ -25,4 +26,13 @@
.append("}").toString
}
+ override def readExternal(in: ObjectInput) {
+ k = new Array[Byte](in.read())
+ in.read(k)
+ }
+
+ override def writeExternal(out: ObjectOutput) {
+ out.write(k.length)
+ out.write(k)
+ }
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -13,4 +13,6 @@
val PutResponse = Value(0x02)
val GetRequest = Value(0x03)
val GetResponse = Value(0x04)
+
+ val ErrorResponse = Value(0x50)
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,14 +6,14 @@
* @since 4.1
*/
-class Response(val opCode: OpCodes.OpCode,
- val id: Long,
+class Response(val id: Long,
+ val opCode: OpCodes.OpCode,
val status: Status.Status) {
override def toString = {
new StringBuilder().append("Response").append("{")
- .append("opCode=").append(opCode)
- .append(", id=").append(id)
+ .append("id=").append(id)
+ .append(", opCode=").append(opCode)
.append(", status=").append(status)
.append("}").toString
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalResponse.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -6,10 +6,10 @@
* @since
*/
-class RetrievalResponse(override val opCode: OpCodes.OpCode,
- override val id: Long,
- override val status: Status.Status,
- val value: Array[Byte]) extends Response(opCode, id, status) {
+class RetrievalResponse(override val id: Long,
+ override val opCode: OpCodes.OpCode,
+ override val status: Status.Status,
+ val value: Array[Byte]) extends Response(id, opCode, status) {
override def toString = {
new StringBuilder().append("RetrievalResponse").append("{")
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -11,4 +11,12 @@
val Success = Value(0x00)
val KeyDoesNotExist = Value(0x02)
+
+ val InvalidMagicOrMsgId = Value(0x81)
+ val UnknownCommand = Value(0x82)
+ val UnknownVersion = Value(0x83) // todo: test
+ val ParseError = Value(0x84) // todo: test
+ val ServerError = Value(0x85) // todo: test
+ val CommandTimedOut = Value(0x86) // todo: test
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Value.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -1,12 +1,15 @@
package org.infinispan.server.hotrod
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since
*/
-final class Value(val v: Array[Byte]) {
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
+final class Value(var v: Array[Byte]) extends Externalizable {
override def toString = {
new StringBuilder().append("Value").append("{")
@@ -14,4 +17,13 @@
.append("}").toString
}
+ override def readExternal(in: ObjectInput) {
+ v = new Array[Byte](in.read())
+ in.read(v)
+ }
+
+ override def writeExternal(out: ObjectOutput) {
+ out.write(v.length)
+ out.write(v)
+ }
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -11,9 +11,7 @@
import org.infinispan.manager.{DefaultCacheManager, CacheManager}
import org.infinispan.context.Flag
import org.infinispan.{AdvancedCache, Cache => InfinispanCache}
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
-import javax.transaction.TransactionManager
-import javax.transaction.{Status => TransactionStatus}
+import org.infinispan.test.{SingleCacheManagerTest}
/**
* TODO: Document
@@ -53,6 +51,17 @@
server.stop
}
+ def testUnknownCommand(m: Method) {
+ val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m))
+ assertSuccess(status)
+ }
+
+ def testUnknownMagic(m: Method) {
+ doPut(m) // Do a put to make sure decoder gets back to reading properly
+ val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m))
+ assertSuccess(status)
+ }
+
def testPutBasic(m: Method) {
doPut(m)
}
@@ -89,6 +98,8 @@
assertKeyDoesNotExist(getSt, actual)
}
+
+
// Invalid test since starting transactions does not make sense
// TODO: discuss flags with list
// def testGetWithWriteLock(m: Method) {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -9,7 +9,6 @@
import org.infinispan.server.core.transport.netty.NettyChannelBuffer
import org.jboss.netty.handler.codec.replay.ReplayingDecoder
import org.testng.Assert._
-import java.util.concurrent.{LinkedBlockingQueue, Executors}
import org.infinispan.server.hotrod._
import org.infinispan.server.hotrod.OpCodes._
import org.infinispan.server.hotrod.Status._
@@ -18,6 +17,7 @@
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.infinispan.context.Flag
import java.util.Arrays
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
/**
* // TODO: Document this
@@ -47,7 +47,11 @@
}
def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
- val writeFuture = ch.write(new Op(0x01, name, k, lifespan, maxIdle, v, null))
+ put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v)
+ }
+
+ def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
+ val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, null))
writeFuture.awaitUninterruptibly
assertTrue(writeFuture.isSuccess)
// Get the handler instance to retrieve the answer.
@@ -55,12 +59,8 @@
handler.getResponse.status
}
-// def get(ch: Channel, name: String, key: Array[Byte]): (Status.Status, Array[Byte]) = {
-// get(ch, name, key, null)
-// }
-
def get(ch: Channel, name: String, k: Array[Byte], flags: Set[Flag]): (Status.Status, Array[Byte]) = {
- val writeFuture = ch.write(new Op(0x03, name, k, 0, 0, null, flags))
+ val writeFuture = ch.write(new Op(0xA0, 0x03, name, k, 0, 0, null, flags))
writeFuture.awaitUninterruptibly
assertTrue(writeFuture.isSuccess)
// Get the handler instance to retrieve the answer.
@@ -107,11 +107,11 @@
msg match {
case op: Op => {
val buffer = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
- buffer.writeByte(0xA0.asInstanceOf[Byte]) // magic
+ buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
+ buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
buffer.writeByte(41) // version
buffer.writeByte(op.code) // opcode
buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
- buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
if (op.flags != null)
buffer.writeUnsignedInt(Flags.fromContextFlags(op.flags)) // flags
else
@@ -136,12 +136,12 @@
override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
val buf = new NettyChannelBuffer(buffer)
val magic = buf.readUnsignedByte
+ val id = buf.readUnsignedLong
val opCode = OpCodes.apply(buf.readUnsignedByte)
- val id = buf.readUnsignedLong
val status = Status.apply(buf.readUnsignedByte)
val resp: Response =
opCode match {
- case PutResponse => new Response(opCode, id, status)
+ case PutResponse => new Response(id, opCode, status)
case GetResponse => {
val value = {
status match {
@@ -149,8 +149,9 @@
case _ => null
}
}
- new RetrievalResponse(opCode, id, status, value)
+ new RetrievalResponse(id, opCode, status, value)
}
+ case ErrorResponse => new ErrorResponse(id, opCode, status, buf.readString)
}
resp
}
@@ -170,12 +171,13 @@
}
def getResponse: Response = {
- answer.take
+ answer.poll(60, TimeUnit.SECONDS)
}
}
-private class Op(val code: Byte,
+private class Op(val magic: Int,
+ val code: Byte,
val cacheName: String,
val key: Array[Byte],
val lifespan: Int,
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala 2010-03-11 07:52:09 UTC (rev 1594)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala 2010-03-11 08:55:29 UTC (rev 1595)
@@ -42,6 +42,6 @@
object Utils extends Logging
object UniquePortThreadLocal extends ThreadLocal[Int] {
- private val uniqueAddr = new AtomicInteger(21212)
+ private val uniqueAddr = new AtomicInteger(11311)
override def initialValue: Int = uniqueAddr.getAndAdd(100)
}
\ No newline at end of file
More information about the infinispan-commits
mailing list