[infinispan-commits] Infinispan SVN: r1568 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport/netty and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Mar 4 11:44:46 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-04 11:44:44 -0500 (Thu, 04 Mar 2010)
New Revision: 1568
Modified:
trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Implemented basic get command plus added couple of tests. Also extended the ChannelBuffer interface so that it can read/write ranged byte arrays. Ranged means that the length of the array is either read/written before read/writing the actual byte array. Length is in variable length integer format.
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java 2010-03-04 16:44:44 UTC (rev 1568)
@@ -37,9 +37,20 @@
ChannelBuffer readBytes(int length);
int readerIndex();
void readBytes(byte[] dst);
+ byte[] readRangedBytes();
+ /**
+ * Reads length of String and then returns an UTF-8 formatted String of such length.
+ */
+ String readString();
+
void writeByte(byte value);
void writeBytes(byte[] src);
+
+ /**
+ * Writes the length of the byte array and transfers the specified source array's data to this buffer
+ */
+ void writeRangedBytes(byte[] src);
void writeUnsignedInt(int i);
void writeUnsignedLong(long l);
int writerIndex();
Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java 2010-03-04 16:44:44 UTC (rev 1568)
@@ -24,6 +24,8 @@
import org.infinispan.server.core.transport.ChannelBuffer;
+import java.io.UnsupportedEncodingException;
+
/**
* NettyChannelBuffer.
*
@@ -262,8 +264,27 @@
public void readBytes(byte[] dst) {
nettyBuffer.readBytes(dst);
}
-
-//
+
+ @Override
+ public byte[] readRangedBytes() {
+ byte[] array = new byte[readUnsignedInt()];
+ readBytes(array);
+ return array;
+ }
+
+ @Override
+ public String readString() {
+ String ret = null;
+ try {
+ ret = new String(readRangedBytes(), "UTF8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Encoding not supported", e);
+ }
+ return ret;
+ }
+
+
+ //
// @Override
// public void readBytes(ByteBuffer dst) {
// nettyBuffer.readBytes(dst);
@@ -528,11 +549,19 @@
// nettyBuffer.writeBytes(src);
// }
//
- @Override
- public void writeBytes(byte[] src) {
- nettyBuffer.writeBytes(src);
- }
+ @Override
+ public void writeBytes(byte[] src) {
+ nettyBuffer.writeBytes(src);
+ }
+
+ @Override
+ public void writeRangedBytes(byte[] src) {
+ writeUnsignedInt(src.length);
+ nettyBuffer.writeBytes(src);
+ }
+
+
//
// @Override
// public void writeBytes(ByteBuffer src) {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -12,12 +12,22 @@
class CallerCache(val manager: CacheManager) extends Cache {
override def put(c: StorageCommand): Response = {
- val cache: InfinispanCache[Array[Byte], Array[Byte]] = manager.getCache(c.cacheName)
- cache.put(c.key, c.value)
+ val cache = getCache(c.cacheName)
+ cache.put(new Key(c.key), new Value(c.value))
new Response(OpCodes.PutResponse, c.id, Status.Success)
}
override def get(c: RetrievalCommand): Response = {
- null
+ val cache = getCache(c.cacheName)
+ val value = cache.get(new Key(c.key))
+ if (value != null)
+ new RetrievalResponse(OpCodes.GetResponse, c.id, Status.Success, value.v)
+ else
+ new RetrievalResponse(OpCodes.GetResponse, c.id, Status.KeyDoesNotExist, null)
}
+
+ private def getCache(cacheName: String): InfinispanCache[Key, Value] = {
+ // TODO: Detect __default cache and call simply getCache()
+ manager.getCache(cacheName)
+ }
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -2,11 +2,20 @@
/**
* // TODO: Document this
+ *
* @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
*/
-
- at Deprecated
-trait Command {
-// def perform(op: Unit => Replies)
-}
\ No newline at end of file
+abstract class Command(val cacheName: String,
+ val id: Long)
+// type AnyCommand <: Command
+//
+{
+ def perform(cache: Cache): Response
+}
+//
+//}
+////{
+////// type AnyCommand <: Command
+////// def perform(op: Unit => Replies)
+////}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -2,6 +2,7 @@
import org.infinispan.server.core.transport.{ExceptionEvent, ChannelHandlerContext, ChannelBuffer, Decoder}
import org.infinispan.server.core.UnknownCommandException
+import org.infinispan.server.hotrod.OpCodes._
/**
* // TODO: Document this
@@ -11,24 +12,28 @@
class Decoder410 extends Decoder[NoState] {
import Decoder410._
- val Put = 0x01
-
- override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
- val op = buffer.readUnsignedByte
- val cacheName = readString(buffer)
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Command = {
+ val op = OpCodes.apply(buffer.readUnsignedByte)
+ val cacheName = buffer.readString
val id = buffer.readUnsignedLong
- val flags = Flags.extractFlags(buffer.readUnsignedInt)
- val command: StorageCommand =
- op match {
- case Put => {
- val key = readByteArray(buffer)
+ val flags = Flags.extract(buffer.readUnsignedInt)
+ val command: Command =
+ op match {
+ case PutRequest => {
+ val key = buffer.readRangedBytes
val lifespan = buffer.readUnsignedInt
val maxIdle = buffer.readUnsignedInt
- val value = readByteArray(buffer)
+ val value = buffer.readRangedBytes
new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
(cache: Cache, command: StorageCommand) => cache.put(command)
})
}
+ case GetRequest => {
+ val key = buffer.readRangedBytes
+ new RetrievalCommand(cacheName, id, key)({
+ (cache: Cache, command: RetrievalCommand) => cache.get(command)
+ })
+ }
case _ => throw new UnknownCommandException("Command " + op + " not known")
}
command
@@ -38,18 +43,6 @@
error("Error", e.getCause)
}
- private def readString(buffer: ChannelBuffer): String = {
- val array = new Array[Byte](buffer.readUnsignedInt)
- buffer.readBytes(array)
- new String(array, "UTF8")
- }
-
- private def readByteArray(buffer: ChannelBuffer): Array[Byte] = {
- val array = new Array[Byte](buffer.readUnsignedInt)
- buffer.readBytes(array)
- array
- }
-
}
object Decoder410 extends Logging
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -14,6 +14,7 @@
private val Magic = 0xA1
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
+ trace("Encode msg {0}", msg)
val buffer: ChannelBuffer =
msg match {
case r: Response => {
@@ -25,6 +26,13 @@
buffer
}
}
+ msg match {
+ case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+ case _ => {
+ if (buffer == null)
+ throw new IllegalArgumentException("Response received is unknown: " + msg);
+ }
+ }
buffer
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -25,7 +25,7 @@
private val SkipRemoteLookup = Value(1 << 10, Flag.SKIP_REMOTE_LOOKUP.toString)
private val PutForExternalRead = Value(1 << 11, Flag.PUT_FOR_EXTERNAL_READ.toString)
- def extractFlags(bitFlags: Int): Set[Flag] = {
+ def extract(bitFlags: Int): Set[Flag] = {
val s = new HashSet[Flag]
Flags.values.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
new immutable.HashSet ++ s
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -26,7 +26,9 @@
case Version410 => new Decoder410
case _ => throw new StreamCorruptedException("Unknown version:" + version)
}
- decoder.decode(ctx, buffer, state)
+ val command = decoder.decode(ctx, buffer, state)
+ trace("Decoded msg {0}", command)
+ command
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -13,7 +13,8 @@
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
e.getMessage match {
- case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
+// case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
+ case c: Command => e.getChannel.write(c.perform(hotCache))
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -11,5 +11,6 @@
val PutRequest = Value(0x01)
val PutResponse = Value(0x02)
-
+ val GetRequest = Value(0x03)
+ val GetResponse = Value(0x04)
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -8,4 +8,14 @@
class Response(val opCode: OpCodes.OpCode,
val id: Long,
- val status: Status.Status)
\ No newline at end of file
+ val status: Status.Status) {
+
+ override def toString = {
+ new StringBuilder().append("Response").append("{")
+ .append("opCode=").append(opCode)
+ .append(", id=").append(id)
+ .append(", status=").append(status)
+ .append("}").toString
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -6,4 +6,13 @@
* @since 4.1
*/
-class RetrievalCommand
\ No newline at end of file
+class RetrievalCommand(override val cacheName: String,
+ override val id: Long,
+ val key: Array[Byte])
+ (val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id) {
+
+ override def perform(cache: Cache): Response = {
+ op(cache, this)
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -10,4 +10,5 @@
type Status = Value
val Success = Value(0x00)
+ val KeyDoesNotExist = Value(0x02)
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -5,22 +5,28 @@
/**
* // TODO: Document this
* @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
*/
+//class StorageCommand(val cacheName: String,
+// val id: Long,
+// val key: Array[Byte],
+// val lifespan: Int,
+// val maxIdle: Int,
+// val value: Array[Byte],
+// val flags: Set[Flag])
+// (val op: (Cache, StorageCommand) => Response)
-// val cache: Cache[Array[Byte], Array[Byte]]
-class StorageCommand(val cacheName: String,
- val id: Long,
+class StorageCommand(override val cacheName: String,
+ override val id: Long,
val key: Array[Byte],
val lifespan: Int,
val maxIdle: Int,
val value: Array[Byte],
val flags: Set[Flag])
- (val op: (Cache, StorageCommand) => Response)
-//{
-//
-//// def perform(op: StorageCommand => Replies.Value) {
-//// op(this)
-//// }
-////
-//}
\ No newline at end of file
+ (val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id) {
+
+ override def perform(cache: Cache): Response = {
+ op(cache, this)
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -57,13 +57,13 @@
}
// private def flag(bitFlags: Int)(size: Int)(p: Set[Flags.Value] => Boolean) {
-// var flags = Flags.extractFlags(bitFlags)
+// var flags = Flags.extract(bitFlags)
// assert { flags.size == size }
// assert { true == p(flags) }
// }
private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
- val flags = Flags.extractFlags(bitFlags)
+ val flags = Flags.extract(bitFlags)
assertEquals(flags.size, size)
assertTrue(p(flags))
}
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -7,6 +7,9 @@
import java.lang.reflect.Method
import test.{Client, Utils}
import org.testng.Assert._
+import org.infinispan.server.hotrod.Status._
+import java.util.Arrays
+import org.jboss.netty.channel.Channel
/**
* TODO: Document
@@ -24,22 +27,47 @@
@Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
private var server: HotRodServer = _
+ private var ch: Channel = _
override def createCacheManager: CacheManager = {
val cacheManager = TestCacheManagerFactory.createLocalCacheManager
server = createHotRodServer(cacheManager)
server.start
+ ch = connect("127.0.0.1", server.port)
cacheManager
}
def testPutBasic(m: Method) {
- val result = connect("127.0.0.1", server.port)
- assertTrue(result._1)
- val ch = result._2
val status = put(ch, "__default", k(m) , 0, 0, v(m))
- assertTrue(status == 0, "Status should have been 0 but instead was: " + status)
+ assertSuccess(status)
}
+ def testGetBasic(m: Method) {
+ val putSt = put(ch, "__default", k(m) , 0, 0, v(m))
+ assertSuccess(putSt)
+ val (getSt, actual) = get(ch, "__default", k(m))
+ assertSuccess(getSt, v(m), actual)
+ }
+
+ def testGetDoesNotExist(m: Method) {
+ val (getSt, actual) = get(ch, "__default", k(m))
+ assertKeyDoesNotExist(getSt, actual)
+ }
+
+ private def assertSuccess(status: Status.Status) {
+ assertTrue(status == Success, "Status should have been 'Success' but instead was: " + status)
+ }
+
+ private def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]) {
+ assertSuccess(status)
+ assertTrue(Arrays.equals(expected, actual))
+ }
+
+ private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
+ assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+ assertNull(actual)
+ }
+
@AfterClass(alwaysRun = true)
override def destroyAfterClass {
super.destroyAfterClass
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-04 16:44:44 UTC (rev 1568)
@@ -11,6 +11,8 @@
import org.testng.Assert._
import java.util.concurrent.{LinkedBlockingQueue, Executors}
import org.infinispan.server.hotrod._
+import org.infinispan.server.hotrod.OpCodes._
+import org.infinispan.server.hotrod.Status._
/**
* // TODO: Document this
@@ -22,7 +24,7 @@
*/
trait Client {
- def connect(host: String, port: Int) = {
+ def connect(host: String, port: Int): Channel = {
// Set up.
val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
@@ -35,17 +37,29 @@
val ch = connectFuture.awaitUninterruptibly.getChannel
// Ideally, I'd store channel as a var in this trait. However, this causes issues with TestNG, see:
// http://thread.gmane.org/gmane.comp.lang.scala.user/24317
- (connectFuture.isSuccess, ch)
+ assertTrue(connectFuture.isSuccess)
+ ch
}
- def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
- val writeFuture = ch.write(new Store(cacheName, key, lifespan, maxIdle, value))
+ def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Status = {
+ val writeFuture = ch.write(new Op(0x01, cacheName, key, lifespan, maxIdle, value))
writeFuture.awaitUninterruptibly
assertTrue(writeFuture.isSuccess)
// Get the handler instance to retrieve the answer.
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
- handler.getResponse.status.id.byteValue
+ handler.getResponse.status
}
+
+ def get(ch: Channel, cacheName: String, key: Array[Byte]) = {
+ val writeFuture = ch.write(new Op(0x03, cacheName, key, 0, 0, null))
+ writeFuture.awaitUninterruptibly
+ assertTrue(writeFuture.isSuccess)
+ // Get the handler instance to retrieve the answer.
+ var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+ val resp = handler.getResponse.asInstanceOf[RetrievalResponse]
+ (resp.status, resp.value)
+ }
+
}
@ChannelPipelineCoverage("all")
@@ -67,21 +81,20 @@
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
val ret =
msg match {
- case s: Store => {
+ case op: Op => {
val buffer = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
buffer.writeByte(0xA0.asInstanceOf[Byte]) // magic
buffer.writeByte(41) // version
- buffer.writeByte(0x01) // opcode - put
- buffer.writeUnsignedInt(s.cacheName.length) // cache name length
- buffer.writeBytes(s.cacheName.getBytes()) // cache name
+ buffer.writeByte(op.code) // opcode
+ buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
buffer.writeUnsignedLong(1) // message id
buffer.writeUnsignedInt(0) // flags
- buffer.writeUnsignedInt(s.key.length) // key length
- buffer.writeBytes(s.key) // key
- buffer.writeUnsignedInt(s.lifespan) // lifespan
- buffer.writeUnsignedInt(s.maxIdle) // maxIdle
- buffer.writeUnsignedInt(s.value.length) // value length
- buffer.writeBytes(s.value) // value
+ buffer.writeRangedBytes(op.key) // key length + key
+ if (op.value != null) {
+ buffer.writeUnsignedInt(op.lifespan) // lifespan
+ buffer.writeUnsignedInt(op.maxIdle) // maxIdle
+ buffer.writeRangedBytes(op.value) // value length + value
+ }
buffer.getUnderlyingChannelBuffer
}
}
@@ -95,10 +108,23 @@
override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
val buf = new NettyChannelBuffer(buffer)
val magic = buf.readUnsignedByte
- val opCode = buf.readUnsignedByte
+ val opCode = OpCodes.apply(buf.readUnsignedByte)
val id = buf.readUnsignedLong
- val status = buf.readUnsignedByte
- new Response(OpCodes.apply(opCode), id, Status.apply(status))
+ val status = Status.apply(buf.readUnsignedByte)
+ val resp: Response =
+ opCode match {
+ case PutResponse => new Response(opCode, id, status)
+ case GetResponse => {
+ val value = {
+ status match {
+ case Success => buf.readRangedBytes
+ case _ => null
+ }
+ }
+ new RetrievalResponse(opCode, id, status, value)
+ }
+ }
+ resp
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
@@ -122,6 +148,9 @@
}
-private class Store(val cacheName: String, val key: Array[Byte],
- val lifespan: Int, val maxIdle: Int,
- val value: Array[Byte])
\ No newline at end of file
+private class Op(val code: Byte,
+ val cacheName: String,
+ val key: Array[Byte],
+ val lifespan: Int,
+ val maxIdle: Int,
+ val value: Array[Byte])
\ No newline at end of file
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java 2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java 2010-03-04 16:44:44 UTC (rev 1568)
@@ -34,6 +34,7 @@
* @author Galder Zamarreño
* @since 4.1
*/
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
public class Value implements Externalizable {
private int flags;
private byte[] data;
More information about the infinispan-commits
mailing list