[infinispan-commits] Infinispan SVN: r1568 - in trunk/server: core/src/main/java/org/infinispan/server/core/transport/netty and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Mar 4 11:44:46 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-04 11:44:44 -0500 (Thu, 04 Mar 2010)
New Revision: 1568

Modified:
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Implemented basic get command plus added couple of tests. Also extended the ChannelBuffer interface so that it can read/write ranged byte arrays. Ranged means that the length of the array is either read/written before read/writing the actual byte array. Length is in variable length integer format.

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/ChannelBuffer.java	2010-03-04 16:44:44 UTC (rev 1568)
@@ -37,9 +37,20 @@
    ChannelBuffer readBytes(int length);
    int readerIndex();
    void readBytes(byte[] dst);
+   byte[] readRangedBytes();
 
+   /**
+    * Reads length of String and then returns an UTF-8 formatted String of such length.
+    */
+   String readString();
+
    void writeByte(byte value);
    void writeBytes(byte[] src);
+
+   /**
+    * Writes the length of the byte array and transfers the specified source array's data to this buffer
+    */
+   void writeRangedBytes(byte[] src);
    void writeUnsignedInt(int i);
    void writeUnsignedLong(long l);
    int writerIndex();

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelBuffer.java	2010-03-04 16:44:44 UTC (rev 1568)
@@ -24,6 +24,8 @@
 
 import org.infinispan.server.core.transport.ChannelBuffer;
 
+import java.io.UnsupportedEncodingException;
+
 /**
  * NettyChannelBuffer.
  * 
@@ -262,8 +264,27 @@
    public void readBytes(byte[] dst) {
       nettyBuffer.readBytes(dst);
    }
-   
-//
+
+   @Override
+   public byte[] readRangedBytes() {
+      byte[] array = new byte[readUnsignedInt()];
+      readBytes(array);
+      return array;
+   }
+
+   @Override
+   public String readString() {
+      String ret = null;
+      try {
+         ret = new String(readRangedBytes(), "UTF8");
+      } catch (UnsupportedEncodingException e) {
+         throw new RuntimeException("Encoding not supported", e);
+      }
+      return ret;
+   }
+
+
+   //
 //   @Override
 //   public void readBytes(ByteBuffer dst) {
 //      nettyBuffer.readBytes(dst);
@@ -528,11 +549,19 @@
 //      nettyBuffer.writeBytes(src);
 //   }
 //
-     @Override
-     public void writeBytes(byte[] src) {
-        nettyBuffer.writeBytes(src);
-     }
 
+   @Override
+   public void writeBytes(byte[] src) {
+      nettyBuffer.writeBytes(src);
+   }
+
+   @Override
+   public void writeRangedBytes(byte[] src) {
+      writeUnsignedInt(src.length);
+      nettyBuffer.writeBytes(src);
+   }
+
+
 //
 //   @Override
 //   public void writeBytes(ByteBuffer src) {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -12,12 +12,22 @@
 class CallerCache(val manager: CacheManager) extends Cache {
 
    override def put(c: StorageCommand): Response = {
-      val cache: InfinispanCache[Array[Byte], Array[Byte]] = manager.getCache(c.cacheName)
-      cache.put(c.key, c.value)
+      val cache = getCache(c.cacheName)
+      cache.put(new Key(c.key), new Value(c.value))
       new Response(OpCodes.PutResponse, c.id, Status.Success)
    }
 
    override def get(c: RetrievalCommand): Response = {
-      null
+      val cache = getCache(c.cacheName)
+      val value = cache.get(new Key(c.key))
+      if (value != null)
+         new RetrievalResponse(OpCodes.GetResponse, c.id, Status.Success, value.v)
+      else
+         new RetrievalResponse(OpCodes.GetResponse, c.id, Status.KeyDoesNotExist, null)
    }
+
+   private def getCache(cacheName: String): InfinispanCache[Key, Value] = {
+      // TODO: Detect __default cache and call simply getCache()
+      manager.getCache(cacheName)
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -2,11 +2,20 @@
 
 /**
  * // TODO: Document this
+ * 
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
-
- at Deprecated
-trait Command {
-//   def perform(op: Unit => Replies)
-}
\ No newline at end of file
+abstract class Command(val cacheName: String,
+                       val id: Long)
+//   type AnyCommand <: Command
+//
+{
+  def perform(cache: Cache): Response
+}
+//
+//}
+////{
+//////   type AnyCommand <: Command
+//////   def perform(op: Unit => Replies)
+////}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -2,6 +2,7 @@
 
 import org.infinispan.server.core.transport.{ExceptionEvent, ChannelHandlerContext, ChannelBuffer, Decoder}
 import org.infinispan.server.core.UnknownCommandException
+import org.infinispan.server.hotrod.OpCodes._
 
 /**
  * // TODO: Document this
@@ -11,24 +12,28 @@
 class Decoder410 extends Decoder[NoState] {
    import Decoder410._
 
-   val Put = 0x01
-
-   override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): StorageCommand = {
-      val op = buffer.readUnsignedByte
-      val cacheName = readString(buffer)
+   override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer, state: NoState): Command = {
+      val op = OpCodes.apply(buffer.readUnsignedByte)
+      val cacheName = buffer.readString
       val id = buffer.readUnsignedLong
-      val flags = Flags.extractFlags(buffer.readUnsignedInt)
-      val command: StorageCommand =
-         op match {
-            case Put => {
-               val key = readByteArray(buffer)
+      val flags = Flags.extract(buffer.readUnsignedInt)
+      val command: Command =
+         op match {                                   
+            case PutRequest => {
+               val key = buffer.readRangedBytes
                val lifespan = buffer.readUnsignedInt
                val maxIdle = buffer.readUnsignedInt
-               val value = readByteArray(buffer)
+               val value = buffer.readRangedBytes
                new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
                   (cache: Cache, command: StorageCommand) => cache.put(command)
                })
             }
+            case GetRequest => {
+               val key = buffer.readRangedBytes
+               new RetrievalCommand(cacheName, id, key)({
+                  (cache: Cache, command: RetrievalCommand) => cache.get(command)
+               })
+            }
             case _ => throw new UnknownCommandException("Command " + op + " not known")
          }
       command
@@ -38,18 +43,6 @@
       error("Error", e.getCause)
    }
 
-   private def readString(buffer: ChannelBuffer): String = {
-      val array = new Array[Byte](buffer.readUnsignedInt)
-      buffer.readBytes(array)
-      new String(array, "UTF8")
-   }
-
-   private def readByteArray(buffer: ChannelBuffer): Array[Byte] = {
-      val array = new Array[Byte](buffer.readUnsignedInt)
-      buffer.readBytes(array)
-      array
-   }
-
 }
 
 object Decoder410 extends Logging
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -14,6 +14,7 @@
    private val Magic = 0xA1
 
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
+      trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer =
          msg match {
             case r: Response => {
@@ -25,6 +26,13 @@
                buffer
             }
       }
+      msg match {
+         case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+         case _ => {
+            if (buffer == null)
+               throw new IllegalArgumentException("Response received is unknown: " + msg);
+         }
+      }
       buffer
    }
    

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -25,7 +25,7 @@
    private val SkipRemoteLookup = Value(1 << 10, Flag.SKIP_REMOTE_LOOKUP.toString)
    private val PutForExternalRead = Value(1 << 11, Flag.PUT_FOR_EXTERNAL_READ.toString)
 
-   def extractFlags(bitFlags: Int): Set[Flag] = {
+   def extract(bitFlags: Int): Set[Flag] = {
       val s = new HashSet[Flag]
       Flags.values.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
       new immutable.HashSet ++ s

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -26,7 +26,9 @@
             case Version410 => new Decoder410
             case _ => throw new StreamCorruptedException("Unknown version:" + version)
          }
-      decoder.decode(ctx, buffer, state)
+      val command = decoder.decode(ctx, buffer, state)
+      trace("Decoded msg {0}", command)
+      command
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Handler.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -13,7 +13,8 @@
 
    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
       e.getMessage match {
-         case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
+//         case c: StorageCommand => e.getChannel.write(c.op(hotCache, c))
+         case c: Command => e.getChannel.write(c.perform(hotCache))
       }
 
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -11,5 +11,6 @@
 
    val PutRequest = Value(0x01)
    val PutResponse = Value(0x02)
-
+   val GetRequest = Value(0x03)
+   val GetResponse = Value(0x04)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -8,4 +8,14 @@
 
 class Response(val opCode: OpCodes.OpCode,
                val id: Long,
-               val status: Status.Status)
\ No newline at end of file
+               val status: Status.Status) {
+
+   override def toString = {
+      new StringBuilder().append("Response").append("{")
+         .append("opCode=").append(opCode)
+         .append(", id=").append(id)
+         .append(", status=").append(status)
+         .append("}").toString
+   }
+
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -6,4 +6,13 @@
  * @since 4.1
  */
 
-class RetrievalCommand
\ No newline at end of file
+class RetrievalCommand(override val cacheName: String,
+                       override val id: Long,
+                       val key: Array[Byte])
+                      (val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id) {
+
+   override def perform(cache: Cache): Response = {
+      op(cache, this)
+   }
+
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -10,4 +10,5 @@
    type Status = Value
 
    val Success = Value(0x00)
+   val KeyDoesNotExist = Value(0x02)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -5,22 +5,28 @@
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since 4.0
+ * @since 4.1
  */
+//class StorageCommand(val cacheName: String,
+//                     val id: Long,
+//                     val key: Array[Byte],
+//                     val lifespan: Int,
+//                     val maxIdle: Int,
+//                     val value: Array[Byte],
+//                     val flags: Set[Flag])
+//                    (val op: (Cache, StorageCommand) => Response)
 
-// val cache: Cache[Array[Byte], Array[Byte]]
-class StorageCommand(val cacheName: String,
-                     val id: Long,
+class StorageCommand(override val cacheName: String,
+                     override val id: Long,
                      val key: Array[Byte],
                      val lifespan: Int,
                      val maxIdle: Int,
                      val value: Array[Byte],
                      val flags: Set[Flag])
-                    (val op: (Cache, StorageCommand) => Response)
-//{
-//
-////   def perform(op: StorageCommand => Replies.Value) {
-////      op(this)
-////   }
-////
-//}
\ No newline at end of file
+                    (val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id) {
+
+   override def perform(cache: Cache): Response = {
+      op(cache, this)
+   }
+
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -57,13 +57,13 @@
    }
 
 //   private def flag(bitFlags: Int)(size: Int)(p: Set[Flags.Value] => Boolean) {
-//      var flags = Flags.extractFlags(bitFlags)
+//      var flags = Flags.extract(bitFlags)
 //      assert { flags.size == size }
 //      assert { true == p(flags) }
 //   }
 
    private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
-      val flags = Flags.extractFlags(bitFlags)
+      val flags = Flags.extract(bitFlags)
       assertEquals(flags.size, size)
       assertTrue(p(flags))
    }

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -7,6 +7,9 @@
 import java.lang.reflect.Method
 import test.{Client, Utils}
 import org.testng.Assert._
+import org.infinispan.server.hotrod.Status._
+import java.util.Arrays
+import org.jboss.netty.channel.Channel
 
 /**
  * TODO: Document
@@ -24,22 +27,47 @@
 @Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
 class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
    private var server: HotRodServer = _
+   private var ch: Channel = _
 
    override def createCacheManager: CacheManager = {
       val cacheManager = TestCacheManagerFactory.createLocalCacheManager
       server = createHotRodServer(cacheManager)
       server.start
+      ch = connect("127.0.0.1", server.port)
       cacheManager
    }
 
    def testPutBasic(m: Method) {
-      val result = connect("127.0.0.1", server.port)
-      assertTrue(result._1)
-      val ch = result._2
       val status = put(ch, "__default", k(m) , 0, 0, v(m))
-      assertTrue(status == 0, "Status should have been 0 but instead was: " + status)
+      assertSuccess(status)
    }
 
+   def testGetBasic(m: Method) {
+      val putSt = put(ch, "__default", k(m) , 0, 0, v(m))
+      assertSuccess(putSt)
+      val (getSt, actual) = get(ch, "__default", k(m))
+      assertSuccess(getSt, v(m), actual)
+   }
+
+   def testGetDoesNotExist(m: Method) {
+      val (getSt, actual) = get(ch, "__default", k(m))
+      assertKeyDoesNotExist(getSt, actual)
+   }
+
+   private def assertSuccess(status: Status.Status) {
+      assertTrue(status == Success, "Status should have been 'Success' but instead was: " + status)
+   }
+
+   private def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]) {
+      assertSuccess(status)
+      assertTrue(Arrays.equals(expected, actual))
+   }
+
+   private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
+      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+      assertNull(actual)
+   }
+
    @AfterClass(alwaysRun = true)
    override def destroyAfterClass {
       super.destroyAfterClass

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-04 16:44:44 UTC (rev 1568)
@@ -11,6 +11,8 @@
 import org.testng.Assert._
 import java.util.concurrent.{LinkedBlockingQueue, Executors}
 import org.infinispan.server.hotrod._
+import org.infinispan.server.hotrod.OpCodes._
+import org.infinispan.server.hotrod.Status._
 
 /**
  * // TODO: Document this
@@ -22,7 +24,7 @@
  */
 trait Client {
 
-   def connect(host: String, port: Int) = {
+   def connect(host: String, port: Int): Channel = {
       // Set up.
       val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
       val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
@@ -35,17 +37,29 @@
       val ch = connectFuture.awaitUninterruptibly.getChannel
       // Ideally, I'd store channel as a var in this trait. However, this causes issues with TestNG, see:
       // http://thread.gmane.org/gmane.comp.lang.scala.user/24317
-      (connectFuture.isSuccess, ch)
+      assertTrue(connectFuture.isSuccess)
+      ch
    }
 
-   def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Byte = {
-      val writeFuture = ch.write(new Store(cacheName, key, lifespan, maxIdle, value))
+   def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Status = {
+      val writeFuture = ch.write(new Op(0x01, cacheName, key, lifespan, maxIdle, value))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      handler.getResponse.status.id.byteValue
+      handler.getResponse.status
    }
+
+   def get(ch: Channel, cacheName: String, key: Array[Byte]) = {
+      val writeFuture = ch.write(new Op(0x03, cacheName, key, 0, 0, null))
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      // Get the handler instance to retrieve the answer.
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      val resp = handler.getResponse.asInstanceOf[RetrievalResponse]
+      (resp.status, resp.value)
+   }
+
 }
 
 @ChannelPipelineCoverage("all")
@@ -67,21 +81,20 @@
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
       val ret =
          msg match {
-            case s: Store => {
+            case op: Op => {
                val buffer = new NettyChannelBuffer(ChannelBuffers.dynamicBuffer)
                buffer.writeByte(0xA0.asInstanceOf[Byte]) // magic
                buffer.writeByte(41) // version
-               buffer.writeByte(0x01) // opcode - put
-               buffer.writeUnsignedInt(s.cacheName.length) // cache name length
-               buffer.writeBytes(s.cacheName.getBytes()) // cache name
+               buffer.writeByte(op.code) // opcode
+               buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
                buffer.writeUnsignedLong(1) // message id
                buffer.writeUnsignedInt(0) // flags
-               buffer.writeUnsignedInt(s.key.length) // key length
-               buffer.writeBytes(s.key) // key
-               buffer.writeUnsignedInt(s.lifespan) // lifespan
-               buffer.writeUnsignedInt(s.maxIdle) // maxIdle
-               buffer.writeUnsignedInt(s.value.length) // value length
-               buffer.writeBytes(s.value) // value
+               buffer.writeRangedBytes(op.key) // key length + key
+               if (op.value != null) {
+                  buffer.writeUnsignedInt(op.lifespan) // lifespan
+                  buffer.writeUnsignedInt(op.maxIdle) // maxIdle
+                  buffer.writeRangedBytes(op.value) // value length + value
+               }
                buffer.getUnderlyingChannelBuffer
             }
       }
@@ -95,10 +108,23 @@
    override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
       val buf = new NettyChannelBuffer(buffer)
       val magic = buf.readUnsignedByte
-      val opCode = buf.readUnsignedByte
+      val opCode = OpCodes.apply(buf.readUnsignedByte)
       val id = buf.readUnsignedLong
-      val status = buf.readUnsignedByte
-      new Response(OpCodes.apply(opCode), id, Status.apply(status))
+      val status = Status.apply(buf.readUnsignedByte)
+      val resp: Response =
+         opCode match {
+            case PutResponse => new Response(opCode, id, status)
+            case GetResponse => {
+               val value = {
+                  status match {
+                     case Success => buf.readRangedBytes
+                     case _ => null
+                  }
+               }
+               new RetrievalResponse(opCode, id, status, value)
+            }
+         }
+      resp
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
@@ -122,6 +148,9 @@
 
 }
 
-private class Store(val cacheName: String, val key: Array[Byte],
-                    val lifespan: Int, val maxIdle: Int,
-                    val value: Array[Byte])
\ No newline at end of file
+private class Op(val code: Byte,
+                 val cacheName: String,
+                 val key: Array[Byte],
+                 val lifespan: Int,
+                 val maxIdle: Int,
+                 val value: Array[Byte])
\ No newline at end of file

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java	2010-03-04 11:49:13 UTC (rev 1567)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/commands/Value.java	2010-03-04 16:44:44 UTC (rev 1568)
@@ -34,6 +34,7 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
+// TODO: Make it an Externalizer once submodules can extend the marshalling framework
 public class Value implements Externalizable {
    private int flags;
    private byte[] data;



More information about the infinispan-commits mailing list