[infinispan-commits] Infinispan SVN: r1641 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Mar 30 09:38:38 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-30 09:38:37 -0400 (Tue, 30 Mar 2010)
New Revision: 1641
Modified:
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Implemented force return previous value flag.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -75,8 +75,8 @@
private def put(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
val p = params.get
val v = createValue(header, p, generateVersion(cache))
- cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
- createSuccessResponse(header, params)
+ val prev = cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+ createSuccessResponse(header, params, prev)
}
private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -87,9 +87,9 @@
cache.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
}
if (prev == null)
- createSuccessResponse(header, params)
+ createSuccessResponse(header, params, prev)
else
- createNotExecutedResponse(header, params)
+ createNotExecutedResponse(header, params, prev)
}
private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -100,9 +100,9 @@
cache.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
}
if (prev != null)
- createSuccessResponse(header, params)
+ createSuccessResponse(header, params, prev)
else
- createNotExecutedResponse(header, params)
+ createNotExecutedResponse(header, params, prev)
}
private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
@@ -114,11 +114,11 @@
val v = createValue(header, p, generateVersion(cache))
val replaced = cache.replace(k, prev, v);
if (replaced)
- createSuccessResponse(header, params)
+ createSuccessResponse(header, params, prev)
else
- createNotExecutedResponse(header, params)
+ createNotExecutedResponse(header, params, prev)
} else {
- createNotExecutedResponse(header, params)
+ createNotExecutedResponse(header, params, prev)
}
} else createNotExistResponse(header, params)
}
@@ -126,7 +126,7 @@
private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
val prev = cache.remove(k)
if (prev != null)
- createSuccessResponse(header, params)
+ createSuccessResponse(header, params, prev)
else
createNotExistResponse(header, params)
}
@@ -171,9 +171,9 @@
protected def createValue(h: SuitableHeader, p: SuitableParameters, nextVersion: Long): V
- protected def createSuccessResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
+ protected def createSuccessResponse(h: SuitableHeader, params: Option[SuitableParameters], prev: V): AnyRef
- protected def createNotExecutedResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
+ protected def createNotExecutedResponse(h: SuitableHeader, params: Option[SuitableParameters], prev: V): AnyRef
protected def createNotExistResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -30,6 +30,8 @@
decoder = getDecoder(cacheManager)
decoder.start
encoder = getEncoder
+ // TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
+ // TODO: ... requests such as when the lenght of data is bigger than the expected data itself.
val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
val address = new InetSocketAddress(host, port)
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -23,9 +23,9 @@
def createValue(params: RequestParameters, nextVersion: Long): CacheValue
- def createSuccessResponse(header: HotRodHeader): AnyRef
+ def createSuccessResponse(header: HotRodHeader, prev: CacheValue): AnyRef
- def createNotExecutedResponse(header: HotRodHeader): AnyRef
+ def createNotExecutedResponse(header: HotRodHeader, prev: CacheValue): AnyRef
def createNotExistResponse(header: HotRodHeader): AnyRef
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -21,6 +21,7 @@
import RequestResolver._
import ResponseResolver._
import OperationResponse._
+ import ProtocolFlag._
type SuitableHeader = HotRodHeader
override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
@@ -66,15 +67,22 @@
override def createValue(params: RequestParameters, nextVersion: Long): CacheValue =
new CacheValue(params.data, nextVersion)
- override def createSuccessResponse(header: HotRodHeader): AnyRef =
- new Response(header.messageId, toResponse(header.op), Success)
+ override def createSuccessResponse(header: HotRodHeader, prev: CacheValue): AnyRef =
+ createResponse(header, toResponse(header.op), Success, prev)
- override def createNotExecutedResponse(header: HotRodHeader): AnyRef =
- new Response(header.messageId, toResponse(header.op), OperationNotExecuted)
+ override def createNotExecutedResponse(header: HotRodHeader, prev: CacheValue): AnyRef =
+ createResponse(header, toResponse(header.op), OperationNotExecuted, prev)
override def createNotExistResponse(header: HotRodHeader): AnyRef =
- new Response(header.messageId, toResponse(header.op), KeyDoesNotExist)
+ createResponse(header, toResponse(header.op), KeyDoesNotExist, null)
+ private def createResponse(h: HotRodHeader, op: OperationResponse, st: OperationStatus, prev: CacheValue): AnyRef = {
+ if (h.flag == ForceReturnPreviousValue)
+ new ResponseWithPrevious(h.messageId, op, st, if (prev == null) None else Some(prev.data))
+ else
+ new Response(h.messageId, op, st)
+ }
+
override def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
if (v != null && op == GetRequest)
new GetResponse(messageId, GetResponse, Success, Some(v.data))
@@ -97,14 +105,18 @@
if (prev.version == params.get.streamVersion) {
val removed = cache.remove(k, prev);
if (removed)
- new Response(messageId, RemoveIfUnmodifiedResponse, Success)
+ // new Response(messageId, RemoveIfUnmodifiedResponse, Success)
+ createResponse(header, RemoveIfUnmodifiedResponse, Success, prev)
else
- new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+ // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+ createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
} else {
- new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+ // new Response(messageId, RemoveIfUnmodifiedResponse, OperationNotExecuted)
+ createResponse(header, RemoveIfUnmodifiedResponse, OperationNotExecuted, prev)
}
} else {
- new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
+ // new Response(messageId, RemoveIfUnmodifiedResponse, KeyDoesNotExist)
+ createResponse(header, RemoveIfUnmodifiedResponse, KeyDoesNotExist, prev)
}
}
case ContainsKeyRequest => {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -81,11 +81,11 @@
override def createValue(h: HotRodHeader, p: RequestParameters, nextVersion: Long): CacheValue =
h.decoder.createValue(p, nextVersion)
- override def createSuccessResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
- h.decoder.createSuccessResponse(h)
+ override def createSuccessResponse(h: HotRodHeader, p: Option[RequestParameters], prev: CacheValue): AnyRef =
+ h.decoder.createSuccessResponse(h, prev)
- override def createNotExecutedResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
- h.decoder.createNotExecutedResponse(h)
+ override def createNotExecutedResponse(h: HotRodHeader, p: Option[RequestParameters], prev: CacheValue): AnyRef =
+ h.decoder.createNotExecutedResponse(h, prev)
override def createNotExistResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
h.decoder.createNotExistResponse(h)
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -20,6 +20,12 @@
case r: Response => writeHeader(r)
}
msg match {
+ case r: ResponseWithPrevious => {
+ if (r.previous == None)
+ buffer.writeUnsignedInt(0)
+ else
+ buffer.writeRangedBytes(r.previous.get)
+ }
case s: StatsResponse => {
buffer.writeUnsignedInt(s.stats.size)
for ((key, value) <- s.stats) {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -19,6 +19,19 @@
}
}
+class ResponseWithPrevious(override val messageId: Long, override val operation: OperationResponse,
+ override val status: OperationStatus, val previous: Option[Array[Byte]])
+ extends Response(messageId, operation, status) {
+ override def toString = {
+ new StringBuilder().append("ResponseWithPrevious").append("{")
+ .append("messageId=").append(messageId)
+ .append(", operation=").append(operation)
+ .append(", status=").append(status)
+ .append(", previous=").append(if (previous == None) "null" else Util.printArray(previous.get, true))
+ .append("}").toString
+ }
+}
+
class GetResponse(override val messageId: Long, override val operation: OperationResponse,
override val status: OperationStatus, val data: Option[Array[Byte]])
extends Response(messageId, operation, status) {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -50,27 +50,26 @@
}
def testUnknownCommand(m: Method) {
- val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 0)
+ val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0)
assertTrue(status == UnknownOperation,
"Status should have been 'UnknownOperation' but instead was: " + status)
}
def testUnknownMagic(m: Method) {
client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
- val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
+ val status = client.executeWithBadMagic(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0)
assertTrue(status == InvalidMagicOrMsgId,
"Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
}
// todo: test other error conditions such as invalid version...etc
- // todo: add test for force return value operation
def testPutBasic(m: Method) {
client.assertPut(m)
}
def testPutOnDefaultCache(m: Method) {
- val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0, 0)
+ val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0)
assertStatus(status, Success)
val cache = cacheManager.getCache[CacheKey, CacheValue]
val value = cache.get(new CacheKey(k(m)))
@@ -91,6 +90,13 @@
assertKeyDoesNotExist(getSt, actual)
}
+ def testPutWithPreviousValue(m: Method) {
+ val (status, previous) = client.put(k(m) , 0, 0, v(m), 1)
+ assertSuccess(status, Array(), previous)
+ val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
+ assertSuccess(status2, v(m), previous2)
+ }
+
def testGetBasic(m: Method) {
client.assertPut(m)
val (getSt, actual) = client.assertGet(m)
@@ -129,6 +135,14 @@
assertKeyDoesNotExist(getSt, actual)
}
+ def testPutIfAbsentWithPreviousValue(m: Method) {
+ val (status, previous) = client.putIfAbsent(k(m) , 0, 0, v(m), 1)
+ assertSuccess(status, Array(), previous)
+ val (status2, previous2) = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"), 1)
+ assertStatus(status2, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), previous2))
+ }
+
def testReplaceBasic(m: Method) {
client.assertPut(m)
val status = client.replace(k(m), 0, 0, v(m, "v1-"))
@@ -160,6 +174,16 @@
assertKeyDoesNotExist(getSt, actual)
}
+ def testReplaceWithPreviousValue(m: Method) {
+ val (status, previous) = client.replace(k(m) , 0, 0, v(m), 1)
+ assertStatus(status, OperationNotExecuted)
+ assertEquals(previous.length, 0)
+ val (status2, previous2) = client.put(k(m) , 0, 0, v(m, "v2-"), 1)
+ assertSuccess(status2, Array(), previous2)
+ val (status3, previous3) = client.replace(k(m) , 0, 0, v(m, "v3-"), 1)
+ assertSuccess(status3, v(m, "v2-"), previous3)
+ }
+
def testGetWithVersionBasic(m: Method) {
client.assertPut(m)
val (getSt, actual, version) = client.getWithVersion(k(m), 0)
@@ -208,19 +232,44 @@
assertStatus(status, Success)
}
+ def testReplaceIfUnmodifiedWithPreviousValue(m: Method) {
+ val (status, previous) = client.replaceIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
+ assertStatus(status, KeyDoesNotExist)
+ assertEquals(previous.length, 0)
+ client.assertPut(m)
+ val (getSt, actual, version) = client.getWithVersion(k(m), 0)
+ assertSuccess(getSt, v(m), actual)
+ assertTrue(version != 0)
+ val (status2, previous2) = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
+ assertStatus(status2, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), previous2))
+ val (status3, previous3) = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
+ assertStatus(status3, Success)
+ assertTrue(Arrays.equals(v(m), previous3))
+ }
+
def testRemoveBasic(m: Method) {
client.assertPut(m)
- val status = client.remove(k(m), 0)
+ val status = client.remove(k(m))
assertStatus(status, Success)
val (getSt, actual) = client.assertGet(m)
assertKeyDoesNotExist(getSt, actual)
}
def testRemoveDoesNotExist(m: Method) {
- val status = client.remove(k(m), 0)
+ val status = client.remove(k(m))
assertStatus(status, KeyDoesNotExist)
}
+ def testRemoveWithPreviousValue(m: Method) {
+ val (status, previous) = client.remove(k(m), 1)
+ assertStatus(status, KeyDoesNotExist)
+ assertEquals(previous.length, 0)
+ client.assertPut(m)
+ val (status2, previous2) = client.remove(k(m), 1)
+ assertSuccess(status2, v(m), previous2)
+ }
+
def testRemoveIfUnmodifiedBasic(m: Method) {
client.assertPut(m)
val (getSt, actual, version) = client.getWithVersion(k(m), 0)
@@ -260,6 +309,22 @@
assertStatus(status, Success)
}
+ def testRemoveIfUmodifiedWithPreviousValue(m: Method) {
+ val (status, previous) = client.removeIfUnmodified(k(m) , 0, 0, v(m), 999, 1)
+ assertStatus(status, KeyDoesNotExist)
+ assertEquals(previous.length, 0)
+ client.assertPut(m)
+ val (getSt, actual, version) = client.getWithVersion(k(m), 0)
+ assertSuccess(getSt, v(m), actual)
+ assertTrue(version != 0)
+ val (status2, previous2) = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), 888, 1)
+ assertStatus(status2, OperationNotExecuted)
+ assertTrue(Arrays.equals(v(m), previous2))
+ val (status3, previous3) = client.removeIfUnmodified(k(m), 0, 0, v(m, "v3-"), version, 1)
+ assertStatus(status3, Success)
+ assertTrue(Arrays.equals(v(m), previous3))
+ }
+
def testContainsKeyBasic(m: Method) {
client.assertPut(m)
val status = client.containsKey(k(m), 0)
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -53,7 +53,7 @@
def stop = ch.disconnect
def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+ execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0)
def assertPut(m: Method) {
val status = put(k(m) , 0, 0, v(m))
@@ -65,39 +65,71 @@
assertStatus(status, Success)
}
- def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): OperationStatus =
- execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, flags, 0)
+ def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+ execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0)
+ def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
+
def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
- execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+ execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0)
+ def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
+
def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
- execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+ execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version)
+ def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
+
+ def remove(k: Array[Byte]): OperationStatus =
+ execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0)
+
+ def remove(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, flags)
+
def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
- execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+ execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version)
+ def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) =
+ execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, version, flags)
+
def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
- v: Array[Byte], flags: Int, version: Long): OperationStatus = {
- val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
- val writeFuture = ch.write(op)
- writeFuture.awaitUninterruptibly
- assertTrue(writeFuture.isSuccess)
- var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
- handler.getResponse(op.id).status
+ v: Array[Byte], version: Long): OperationStatus = {
+ val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
+ execute(op, op.id)._1
}
def executeWithBadMagic(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
- v: Array[Byte], flags: Int, version: Long): OperationStatus = {
+ v: Array[Byte], version: Long): OperationStatus = {
+ val op = new Op(magic, code, name, k, lifespan, maxIdle, v, 0, version)
+ execute(op, 0)._1
+ }
+
+ def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+ v: Array[Byte], version: Long, flags: Int): (OperationStatus, Array[Byte]) = {
val op = new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version)
+ execute(op, op.id)
+ }
+
+ private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
val writeFuture = ch.write(op)
writeFuture.awaitUninterruptibly
assertTrue(writeFuture.isSuccess)
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
- handler.getResponse(0).status
+ if (op.flags == 1) {
+ val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
+ if (respWithPrevious.previous == None)
+ (respWithPrevious.status, Array())
+ else
+ (respWithPrevious.status, respWithPrevious.previous.get)
+ } else {
+ (handler.getResponse(expectedResponseMessageId).status, null)
+ }
}
def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
@@ -117,10 +149,7 @@
def getWithVersion(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) =
get(0x11, k, 0)
- def remove(k: Array[Byte], flags: Int): OperationStatus =
- execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 0)
-
- def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
+ private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
val writeFuture = ch.write(op)
writeFuture.awaitUninterruptibly
@@ -140,7 +169,7 @@
}
}
- def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 0)
+ def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0)
def stats: Map[String, String] = {
val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
@@ -153,7 +182,7 @@
resp.stats
}
- def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 0)
+ def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0)
}
@@ -172,11 +201,13 @@
@Sharable
private object Encoder extends OneToOneEncoder {
+ val idToOp = new ConcurrentHashMap[Long, Op]
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
val ret =
msg match {
case op: Op => {
+ idToOp.put(op.id, op)
val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
buffer.writeUnsignedLong(op.id) // message id
@@ -214,6 +245,7 @@
}
private object Decoder extends ReplayingDecoder[NoState] with Logging {
+ import Encoder._
override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
val buf = new ChannelBufferAdapter(buffer)
@@ -233,8 +265,20 @@
new StatsResponse(id, immutable.Map[String, String]() ++ stats)
}
case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
- | RemoveResponse | RemoveIfUnmodifiedResponse | ContainsKeyResponse | ClearResponse | PingResponse =>
- new Response(id, opCode, status)
+ | RemoveResponse | RemoveIfUnmodifiedResponse => {
+ val op = idToOp.get(id)
+ if (op.flags == 1) {
+ val length = buf.readUnsignedInt
+ if (length == 0) {
+ new ResponseWithPrevious(id, opCode, status, None)
+ } else {
+ val previous = new Array[Byte](length)
+ buf.readBytes(previous)
+ new ResponseWithPrevious(id, opCode, status, Some(previous))
+ }
+ } else new Response(id, opCode, status)
+ }
+ case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
case GetWithVersionResponse => {
if (status == Success) {
val version = buf.readLong
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-03-30 10:12:04 UTC (rev 1640)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-03-30 13:38:37 UTC (rev 1641)
@@ -220,7 +220,7 @@
}
}
- override def createSuccessResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
+ override def createSuccessResponse(h: RequestHeader, params: Option[MemcachedParameters], prev: MemcachedValue): AnyRef = {
if (isStatsEnabled) {
h.op match {
case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedHits.incrementAndGet
@@ -235,7 +235,7 @@
} else null
}
- override def createNotExecutedResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
+ override def createNotExecutedResponse(h: RequestHeader, params: Option[MemcachedParameters], prev: MemcachedValue): AnyRef = {
if (isStatsEnabled) {
h.op match {
case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedBadval.incrementAndGet
More information about the infinispan-commits
mailing list