[infinispan-commits] Infinispan SVN: r1641 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Mar 30 09:38:38 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-30 09:38:37 -0400 (Tue, 30 Mar 2010)
New Revision: 1641

Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Implemented force return previous value flag.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -75,8 +75,8 @@
    private def put(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
       val p = params.get
       val v = createValue(header, p, generateVersion(cache))
-      cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
-      createSuccessResponse(header, params)
+      val prev = cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+      createSuccessResponse(header, params, prev)
    }
 
    private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -87,9 +87,9 @@
          cache.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
       if (prev == null)
-         createSuccessResponse(header, params)
+         createSuccessResponse(header, params, prev)
       else
-         createNotExecutedResponse(header, params)
+         createNotExecutedResponse(header, params, prev)
    }
 
    private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -100,9 +100,9 @@
          cache.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
       if (prev != null)
-         createSuccessResponse(header, params)
+         createSuccessResponse(header, params, prev)
       else
-         createNotExecutedResponse(header, params)
+         createNotExecutedResponse(header, params, prev)
    }
 
    private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -114,11 +114,11 @@
             val v = createValue(header, p, generateVersion(cache))
             val replaced = cache.replace(k, prev, v);
             if (replaced)
-               createSuccessResponse(header, params)
+               createSuccessResponse(header, params, prev)
             else
-               createNotExecutedResponse(header, params)
+               createNotExecutedResponse(header, params, prev)
          } else {
-            createNotExecutedResponse(header, params)
+            createNotExecutedResponse(header, params, prev)
          }            
       } else createNotExistResponse(header, params)
    }
@@ -126,7 +126,7 @@
    private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
       val prev = cache.remove(k)
       if (prev != null)
-         createSuccessResponse(header, params)
+         createSuccessResponse(header, params, prev)
       else
          createNotExistResponse(header, params)
    }
@@ -171,9 +171,9 @@
 
    protected def createValue(h: SuitableHeader, p: SuitableParameters, nextVersion: Long): V
 
-   protected def createSuccessResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
+   protected def createSuccessResponse(h: SuitableHeader, params: Option[SuitableParameters], prev: V): AnyRef
 
-   protected def createNotExecutedResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
+   protected def createNotExecutedResponse(h: SuitableHeader, params: Option[SuitableParameters], prev: V): AnyRef
 
    protected def createNotExistResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -30,6 +30,8 @@
       decoder = getDecoder(cacheManager)
       decoder.start
       encoder = getEncoder
+      // TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
+      // TODO: ... requests such as when the lenght of data is bigger than the expected data itself.
       val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
       val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
       val address =  new InetSocketAddress(host, port)

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -23,9 +23,9 @@
 
    def createValue(params: RequestParameters, nextVersion: Long): CacheValue
 
-   def createSuccessResponse(header: HotRodHeader): AnyRef
+   def createSuccessResponse(header: HotRodHeader, prev: CacheValue): AnyRef
 
-   def createNotExecutedResponse(header: HotRodHeader): AnyRef
+   def createNotExecutedResponse(header: HotRodHeader, prev: CacheValue): AnyRef
 
    def createNotExistResponse(header: HotRodHeader): AnyRef
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -21,6 +21,7 @@
    import RequestResolver._
    import ResponseResolver._
    import OperationResponse._
+   import ProtocolFlag._
    type SuitableHeader = HotRodHeader
 
    override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
@@ -66,15 +67,22 @@
    override def createValue(params: RequestParameters, nextVersion: Long): CacheValue =
       new CacheValue(params.data, nextVersion)
 
-   override def createSuccessResponse(header: HotRodHeader): AnyRef =
-      new Response(header.messageId, toResponse(header.op), Success)
+   override def createSuccessResponse(header: HotRodHeader, prev: CacheValue): AnyRef =
+      createResponse(header, toResponse(header.op), Success, prev)
 
-   override def createNotExecutedResponse(header: HotRodHeader): AnyRef =
-      new Response(header.messageId, toResponse(header.op), OperationNotExecuted)
+   override def createNotExecutedResponse(header: HotRodHeader, prev: CacheValue): AnyRef =
+      createResponse(header, toResponse(header.op), OperationNotExecuted, prev)
 
    override def createNotExistResponse(header: HotRodHeader): AnyRef =
-      new Response(header.messageId, toResponse(header.op), KeyDoesNotExist)   
+      createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
 
+   private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
+      if (h.flag == ForceReturnPreviousValue)
+         new ResponseWithPrevious(h.messageId, op, st, if (prev == null) None else Some(prev.data))
+      else
+         new Response(h.messageId, op, st)
+   }
+
    override def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
       if (v != null && op == GetRequest)
          new GetResponse(messageId, GetResponse, Success, Some(v.data))
@@ -97,14 +105,18 @@
                if (prev.version == params.get.streamVersion) {
                   val removed = cache.remove(k, prev);
                   if (removed)
-                     new Response(messageId, RemoveIfUnmodifiedResponse, Success)
+                     // new Response(messageId, RemoveIfUnmodifiedResponse, Success)
+                     createResponse(header, RemoveIfUnmodifiedResponse, Success, prev)
                   else
-                     new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+                     // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+                     createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
                } else {
-                  new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+                  // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+                  createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
                }
             } else {
-               new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
+               // new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
+               createResponse(header, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
             }
          }
          case ContainsKeyRequest => {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -81,11 +81,11 @@
    override def createValue(h: HotRodHeader, p: RequestParameters, nextVersion: Long): CacheValue =
       h.decoder.createValue(p, nextVersion)
 
-   override def createSuccessResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
-      h.decoder.createSuccessResponse(h)
+   override def createSuccessResponse(h: HotRodHeader, p: Option[RequestParameters], prev: CacheValue): AnyRef =
+      h.decoder.createSuccessResponse(h, prev)
 
-   override def createNotExecutedResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
-      h.decoder.createNotExecutedResponse(h)
+   override def createNotExecutedResponse(h: HotRodHeader, p: Option[RequestParameters], prev: CacheValue): AnyRef =
+      h.decoder.createNotExecutedResponse(h, prev)
 
    override def createNotExistResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
       h.decoder.createNotExistResponse(h)

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -20,6 +20,12 @@
          case r: Response => writeHeader(r)
       }
       msg match {
+         case r: ResponseWithPrevious => {
+            if (r.previous == None)
+               buffer.writeUnsignedInt(0)
+            else
+               buffer.writeRangedBytes(r.previous.get)
+         }
          case s: StatsResponse => {
             buffer.writeUnsignedInt(s.stats.size)
             for ((key, value) <- s.stats) {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -19,6 +19,19 @@
    }
 }
 
+class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
+                                override val status: OperationStatus, val previous: Option[Array[Byte]])
+      extends Response(messageId, operation, status) {
+   override def toString = {
+      new StringBuilder().append("ResponseWithPrevious").append("{")
+         .append("messageId=").append(messageId)
+         .append(", operation=").append(operation)
+         .append(", status=").append(status)
+         .append(", previous=").append(if (previous == None) "null" else Util.printArray(previous.get, true))
+         .append("}").toString
+   }
+}
+
 class GetResponse(override val messageId: Long, override val operation: OperationResponse,
                   override val status: OperationStatus, val data: Option[Array[Byte]])
       extends Response(messageId, operation, status) {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -50,27 +50,26 @@
    }
 
    def testUnknownCommand(m: Method) {
-      val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 0)
+      val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0)
       assertTrue(status == UnknownOperation,
          "Status should have been 'UnknownOperation' but instead was: " + status)
    }
 
    def testUnknownMagic(m: Method) {
       client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
-      val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
+      val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0)
       assertTrue(status == InvalidMagicOrMsgId,
          "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
    }
 
    // todo: test other error conditions such as invalid version...etc
-   // todo: add test for force return value operation
 
    def testPutBasic(m: Method) {
       client.assertPut(m)
    }
 
    def testPutOnDefaultCache(m: Method) {
-      val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0, 0)
+      val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0)
       assertStatus(status, Success)
       val cache = cacheManager.getCache[CacheKey, CacheValue]
       val value = cache.get(new CacheKey(k(m)))
@@ -91,6 +90,13 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
+   def testPutWithPreviousValue(m: Method) {
+      val (status, previous) = client.put(k(m) , 0, 0, v(m), 1)
+      assertSuccess(status, Array(), previous)
+      val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
+      assertSuccess(status2, v(m), previous2)
+   }
+
    def testGetBasic(m: Method) {
       client.assertPut(m)
       val (getSt, actual) = client.assertGet(m)
@@ -129,6 +135,14 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
+   def testPutIfAbsentWithPreviousValue(m: Method) {
+      val (status, previous) = client.putIfAbsent(k(m) , 0, 0, v(m), 1)
+      assertSuccess(status, Array(), previous)
+      val (status2, previous2) = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1)
+      assertStatus(status2, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), previous2))
+   }
+
    def testReplaceBasic(m: Method) {
       client.assertPut(m)
       val status = client.replace(k(m), 0, 0, v(m, "v1-"))
@@ -160,6 +174,16 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
+   def testReplaceWithPreviousValue(m: Method) {
+      val (status, previous) = client.replace(k(m) , 0, 0, v(m), 1)
+      assertStatus(status, OperationNotExecuted)
+      assertEquals(previous.length, 0)
+      val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
+      assertSuccess(status2, Array(), previous2)
+      val (status3, previous3) = client.replace(k(m) , 0, 0, v(m, "v3-"), 1)
+      assertSuccess(status3, v(m, "v2-"), previous3)
+   }
+
    def testGetWithVersionBasic(m: Method) {
       client.assertPut(m)
       val (getSt, actual, version) = client.getWithVersion(k(m), 0)
@@ -208,19 +232,44 @@
       assertStatus(status, Success)
    }
 
+   def testReplaceIfUnmodifiedWithPreviousValue(m: Method) {
+      val (status, previous) = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
+      assertStatus(status, KeyDoesNotExist)
+      assertEquals(previous.length, 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
+      assertSuccess(getSt, v(m), actual)
+      assertTrue(version != 0)
+      val (status2, previous2)  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
+      assertStatus(status2, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), previous2))
+      val (status3, previous3)  = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
+      assertStatus(status3, Success)
+      assertTrue(Arrays.equals(v(m), previous3))
+   }
+
    def testRemoveBasic(m: Method) {
       client.assertPut(m)
-      val status = client.remove(k(m), 0)
+      val status = client.remove(k(m))
       assertStatus(status, Success)
       val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testRemoveDoesNotExist(m: Method) {
-      val status = client.remove(k(m), 0)
+      val status = client.remove(k(m))
       assertStatus(status, KeyDoesNotExist)
    }
 
+   def testRemoveWithPreviousValue(m: Method) {
+      val (status, previous) = client.remove(k(m), 1)
+      assertStatus(status, KeyDoesNotExist)
+      assertEquals(previous.length, 0)
+      client.assertPut(m)
+      val (status2, previous2) = client.remove(k(m), 1)
+      assertSuccess(status2, v(m), previous2)
+   }
+
    def testRemoveIfUnmodifiedBasic(m: Method) {
       client.assertPut(m)
       val (getSt, actual, version) = client.getWithVersion(k(m), 0)
@@ -260,6 +309,22 @@
       assertStatus(status, Success)
    }
 
+   def testRemoveIfUmodifiedWithPreviousValue(m: Method) {
+      val (status, previous) = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
+      assertStatus(status, KeyDoesNotExist)
+      assertEquals(previous.length, 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
+      assertSuccess(getSt, v(m), actual)
+      assertTrue(version != 0)
+      val (status2, previous2)  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
+      assertStatus(status2, OperationNotExecuted)
+      assertTrue(Arrays.equals(v(m), previous2))
+      val (status3, previous3)  = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
+      assertStatus(status3, Success)
+      assertTrue(Arrays.equals(v(m), previous3))
+   }
+
    def testContainsKeyBasic(m: Method) {
       client.assertPut(m)
       val status = client.containsKey(k(m), 0)

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -53,7 +53,7 @@
    def stop = ch.disconnect
 
    def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0)
 
    def assertPut(m: Method) {
       val status = put(k(m) , 0, 0, v(m))
@@ -65,39 +65,71 @@
       assertStatus(status, Success)
    }
 
-   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): OperationStatus =
-      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, flags, 0)
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
 
    def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0)
 
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
+   
    def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
-      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0)
 
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)   
+
    def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
-      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version)
 
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
+
+   def remove(k: Array[Byte]): OperationStatus =
+      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0)
+
+   def remove(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, flags)
+
    def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
-      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version)
 
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
+
    def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-               v: Array[Byte], flags: Int, version: Long): OperationStatus = {
-      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
-      val writeFuture = ch.write(op)
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
-      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      handler.getResponse(op.id).status
+               v: Array[Byte], version: Long): OperationStatus = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
+      execute(op, op.id)._1
    }
 
    def executeWithBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-                           v: Array[Byte], flags: Int, version: Long): OperationStatus = {
+                           v: Array[Byte], version: Long): OperationStatus = {
+      val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
+      execute(op, 0)._1
+   }
+
+   def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+               v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) = {
       val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+      execute(op, op.id)
+   }
+
+   private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
       val writeFuture = ch.write(op)
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      handler.getResponse(0).status
+      if (op.flags == 1) {
+         val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
+         if (respWithPrevious.previous == None)
+            (respWithPrevious.status, Array())
+         else
+            (respWithPrevious.status, respWithPrevious.previous.get)
+      } else {
+         (handler.getResponse(expectedResponseMessageId).status, null)
+      }       
    }
 
    def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
@@ -117,10 +149,7 @@
    def getWithVersion(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) =
       get(0x11, k, 0)
 
-   def remove(k: Array[Byte], flags: Int): OperationStatus =
-      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 0)
-
-   def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
+   private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
       val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
       val writeFuture = ch.write(op)
       writeFuture.awaitUninterruptibly
@@ -140,7 +169,7 @@
       }
    }
 
-   def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 0)
+   def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0)
 
    def stats: Map[String, String] = {
       val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
@@ -153,7 +182,7 @@
       resp.stats
    }
 
-   def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 0)
+   def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0)
 
 }
 
@@ -172,11 +201,13 @@
 
 @Sharable
 private object Encoder extends OneToOneEncoder {
+   val idToOp = new ConcurrentHashMap[Long, Op] 
 
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
       val ret =
          msg match {
             case op: Op => {
+               idToOp.put(op.id, op)
                val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
                buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
                buffer.writeUnsignedLong(op.id) // message id
@@ -214,6 +245,7 @@
 }
 
 private object Decoder extends ReplayingDecoder[NoState] with Logging {
+   import Encoder._
 
    override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
       val buf = new ChannelBufferAdapter(buffer)
@@ -233,8 +265,20 @@
                new StatsResponse(id, immutable.Map[String, String]() ++ stats)
             }
             case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
-                 | RemoveResponse | RemoveIfUnmodifiedResponse | ContainsKeyResponse | ClearResponse | PingResponse =>
-               new Response(id, opCode, status)
+                 | RemoveResponse | RemoveIfUnmodifiedResponse => {
+               val op = idToOp.get(id)
+               if (op.flags == 1) {
+                  val length = buf.readUnsignedInt
+                  if (length == 0) {
+                     new ResponseWithPrevious(id, opCode, status, None)
+                  } else {
+                     val previous = new Array[Byte](length)
+                     buf.readBytes(previous)
+                     new ResponseWithPrevious(id, opCode, status, Some(previous))
+                  }
+               } else new Response(id, opCode, status)
+            }
+            case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
             case GetWithVersionResponse  => {
                if (status == Success) {
                   val version = buf.readLong

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-30 13:38:37 UTC (rev 1641)
@@ -220,7 +220,7 @@
       }
    }
 
-   override def createSuccessResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
+   override def createSuccessResponse(h: RequestHeader, params: Option[MemcachedParameters], prev: MemcachedValue): AnyRef = {
       if (isStatsEnabled) {
          h.op match {
             case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedHits.incrementAndGet
@@ -235,7 +235,7 @@
       } else null
    }
 
-   override def createNotExecutedResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
+   override def createNotExecutedResponse(h: RequestHeader, params: Option[MemcachedParameters], prev: MemcachedValue): AnyRef = {
       if (isStatsEnabled) {
          h.op match {
             case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedBadval.incrementAndGet



More information about the infinispan-commits mailing list