[infinispan-commits] Infinispan SVN: r1598 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Mar 11 06:56:07 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-11 06:56:06 -0500 (Thu, 11 Mar 2010)
New Revision: 1598
Modified:
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protoco) putIfAbsent implemented.
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -9,4 +9,5 @@
abstract class Cache {
def put(c: StorageCommand): Response
def get(c: RetrievalCommand): Response
+ def putIfAbsent(c: StorageCommand): Response
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -17,25 +17,37 @@
override def put(c: StorageCommand): Response = {
val cache = getCache(c.cacheName, c.flags)
- val k = new Key(c.key)
- val v = new Value(c.value)
(c.lifespan, c.maxIdle) match {
- case (0, 0) => cache.put(k, v)
- case (x, 0) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
- case (x, y) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
+ case (0, 0) => cache.put(c.k, c.v)
+ case (x, 0) => cache.put(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
+ case (x, y) => cache.put(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
}
new Response(c.id, OpCodes.PutResponse, Status.Success)
}
override def get(c: RetrievalCommand): Response = {
val cache = getCache(c.cacheName, c.flags)
- val value = cache.get(new Key(c.key))
+ val value = cache.get(c.k)
if (value != null)
new RetrievalResponse(c.id, OpCodes.GetResponse, Status.Success, value.v)
else
new RetrievalResponse(c.id, OpCodes.GetResponse, Status.KeyDoesNotExist, null)
}
+ override def putIfAbsent(c: StorageCommand): Response = {
+ val cache = getCache(c.cacheName, c.flags)
+ val prev =
+ (c.lifespan, c.maxIdle) match {
+ case (0, 0) => cache.putIfAbsent(c.k, c.v)
+ case (x, 0) => cache.putIfAbsent(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
+ case (x, y) => cache.putIfAbsent(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
+ }
+ if (prev == null)
+ new Response(c.id, OpCodes.PutIfAbsentResponse, Status.Success)
+ else
+ new Response(c.id, OpCodes.PutIfAbsentResponse, Status.OperationNotExecuted)
+ }
+
private def getCache(cacheName: String, flags: Set[Flag]): InfinispanCache[Key, Value] = {
val isDefaultCache = cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME
val isWithFlags = ! flags.isEmpty
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -18,18 +18,20 @@
val flags = Flags.toContextFlags(buffer.readUnsignedInt)
val command: Command =
op match {
- case PutRequest => {
- val key = buffer.readRangedBytes
+ case PutRequest | PutIfAbsentRequest => {
+ val k = new Key(buffer.readRangedBytes)
val lifespan = buffer.readUnsignedInt
val maxIdle = buffer.readUnsignedInt
- val value = buffer.readRangedBytes
- new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
- (cache: Cache, command: StorageCommand) => cache.put(command)
- })
+ val v = new Value(buffer.readRangedBytes)
+ val f = op match {
+ case PutRequest => (cache: Cache, command: StorageCommand) => cache.put(command)
+ case PutIfAbsentRequest => (cache: Cache, command: StorageCommand) => cache.putIfAbsent(command)
+ }
+ new StorageCommand(cacheName, id, k, lifespan, maxIdle, v, flags)(f)
}
case GetRequest => {
- val key = buffer.readRangedBytes
- new RetrievalCommand(cacheName, id, key, flags)({
+ val k = new Key(buffer.readRangedBytes)
+ new RetrievalCommand(cacheName, id, k, flags)({
(cache: Cache, command: RetrievalCommand) => cache.get(command)
})
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -15,33 +15,26 @@
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
trace("Encode msg {0}", msg)
-// try {
- val buffer: ChannelBuffer =
- msg match {
- case r: Response => {
- val buffer = ctx.getChannelBuffers.dynamicBuffer
- buffer.writeByte(Magic.byteValue)
- buffer.writeUnsignedLong(r.id)
- buffer.writeByte(r.opCode.id.byteValue)
- buffer.writeByte(r.status.id.byteValue)
- buffer
- }
- }
+ val buffer: ChannelBuffer =
msg match {
- case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
- case er: ErrorResponse => buffer.writeString(er.msg)
- case _ => {
- if (buffer == null)
- throw new IllegalArgumentException("Response received is unknown: " + msg);
+ case r: Response => {
+ val buffer = ctx.getChannelBuffers.dynamicBuffer
+ buffer.writeByte(Magic.byteValue)
+ buffer.writeUnsignedLong(r.id)
+ buffer.writeByte(r.opCode.id.byteValue)
+ buffer.writeByte(r.status.id.byteValue)
+ buffer
}
+ }
+ msg match {
+ case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+ case er: ErrorResponse => buffer.writeString(er.msg)
+ case _ => {
+ if (buffer == null)
+ throw new IllegalArgumentException("Response received is unknown: " + msg);
}
- buffer
-// } catch {
-// case t: Throwable => {
-// val buffer = ctx.getChannelBuffers.dynamicBuffer
-// buffer.writeByte(Magic.byteValue)
-// }
-// }
+ }
+ buffer
}
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -22,9 +22,8 @@
val magic = buffer.readUnsignedByte()
if (magic != Magic) {
if (!isError) {
- // throw new StreamCorruptedException("Magic byte incorrect: " + magic)
- val m = "Error reading magic byte or message id: " + magic
- return new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m)
+ val t = new StreamCorruptedException("Error reading magic byte or message id: " + magic)
+ return createErrorResponse(0, Status.InvalidMagicOrMsgId, t)
} else {
trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
return null // Keep trying to read until we find magic
@@ -63,10 +62,7 @@
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-// val t = e.getCause
-// val m = if (t.getMessage != null) t.getMessage else ""
error("Unexpected error", e.getCause)
-// e.getChannel.write(new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m))
}
}
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -13,6 +13,8 @@
val PutResponse = Value(0x02)
val GetRequest = Value(0x03)
val GetResponse = Value(0x04)
+ val PutIfAbsentRequest = Value(0x05)
+ val PutIfAbsentResponse = Value(0x06)
val ErrorResponse = Value(0x50)
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -10,7 +10,7 @@
class RetrievalCommand(override val cacheName: String,
override val id: Long,
- val key: Array[Byte],
+ val k: Key,
override val flags: Set[Flag])
(val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id, flags) {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -27,5 +27,10 @@
// TODO: calculate stats if necessary
super.get(c)
}
-
+
+ abstract override def putIfAbsent(c: StorageCommand): Response = {
+ // TODO: calculate stats if necessary
+ super.putIfAbsent(c)
+ }
+
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -10,6 +10,7 @@
type Status = Value
val Success = Value(0x00)
+ val OperationNotExecuted = Value(0x01)
val KeyDoesNotExist = Value(0x02)
val InvalidMagicOrMsgId = Value(0x81)
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -9,10 +9,10 @@
*/
class StorageCommand(override val cacheName: String,
override val id: Long,
- val key: Array[Byte],
+ val k: Key,
val lifespan: Int,
val maxIdle: Int,
- val value: Array[Byte],
+ val v: Value,
override val flags: Set[Flag])
(val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id, flags) {
@@ -24,10 +24,10 @@
new StringBuilder().append("StorageCommand").append("{")
.append("cacheName=").append(cacheName)
.append(", id=").append(id)
- .append(", key=").append(key)
+ .append(", k=").append(k)
.append(", lifespan=").append(lifespan)
.append(", maxIdle=").append(maxIdle)
- .append(", value=").append(value)
+ .append(", v=").append(v)
.append(", flags=").append(flags)
.append("}").toString
}
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -8,6 +8,8 @@
import org.infinispan.manager.CacheManager
import org.testng.annotations.{BeforeClass, AfterClass, Test}
import org.infinispan.config.Configuration.CacheMode
+import org.infinispan.context.Flag
+import org.infinispan.server.hotrod.Status._
/**
* // TODO: Document this
@@ -40,12 +42,19 @@
servers.foreach(_.stop)
}
- @Test
def tesReplicatedPut(m: Method) {
val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m))
- assertSuccess(putSt)
+ assertStatus(putSt, Success)
val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
assertSuccess(getSt, v(m), actual)
}
+ def tesLocalOnlyPut(m: Method) {
+ val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m), Set(Flag.CACHE_MODE_LOCAL))
+ assertStatus(putSt, Success)
+ val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
+ assertKeyDoesNotExist(getSt, actual)
+ }
+
+ // todo: test multiple flags
}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -52,23 +52,27 @@
}
def testUnknownCommand(m: Method) {
- val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m))
- assertSuccess(status)
+ val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), null)
+ assertTrue(status == UnknownCommand,
+ "Status should have been 'UnknownCommand' but instead was: " + status)
}
def testUnknownMagic(m: Method) {
doPut(m) // Do a put to make sure decoder gets back to reading properly
- val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m))
- assertSuccess(status)
+ val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m), null)
+ assertTrue(status == InvalidMagicOrMsgId,
+ "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
}
+ // todo: test other error conditions such as invalid version...etc
+
def testPutBasic(m: Method) {
doPut(m)
}
def testPutOnDefaultCache(m: Method) {
val status = put(ch, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m) , 0, 0, v(m))
- assertSuccess(status)
+ assertStatus(status, Success)
val cache: InfinispanCache[Key, Value] = cacheManager.getCache[Key, Value]
assertTrue(Arrays.equals(cache.get(new Key(k(m))).v, v(m)));
}
@@ -98,6 +102,17 @@
assertKeyDoesNotExist(getSt, actual)
}
+ def testPutIfAbsentNotExist(m: Method) {
+ val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m))
+ assertStatus(status, Success)
+ }
+
+ def testPutIfAbsentExist(m: Method) {
+ doPut(m)
+ val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m, "v2-"))
+ assertStatus(status, OperationNotExecuted)
+ }
+
// Invalid test since starting transactions does not make sense
@@ -132,19 +147,19 @@
// assertSuccess(status)
// assertTrue(Arrays.equals(expected, actual))
// }
+//
+// private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
+// assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+// assertNull(actual)
+// }
- private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
- assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
- assertNull(actual)
- }
-
private def doPut(m: Method) {
doPutWithLifespanMaxIdle(m, 0, 0)
}
private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
- assertSuccess(status)
+ assertStatus(status, Success)
}
private def doGet(m: Method): (Status.Status, Array[Byte]) = {
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala 2010-03-11 11:56:06 UTC (rev 1598)
@@ -47,11 +47,21 @@
}
def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
- put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v)
+ put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, null)
}
- def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
- val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, null))
+ def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
+ flags: Set[Flag]): Status = {
+ put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, flags)
+ }
+
+ def putIfAbsent(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
+ put(ch, 0xA0, 0x05, name, k, lifespan, maxIdle, v, null)
+ }
+
+ def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+ v: Array[Byte], flags: Set[Flag]): Status = {
+ val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, flags))
writeFuture.awaitUninterruptibly
assertTrue(writeFuture.isSuccess)
// Get the handler instance to retrieve the answer.
@@ -69,19 +79,25 @@
(resp.status, resp.value)
}
- def assertSuccess(status: Status.Status): Boolean = {
- val isSuccess = status == Success
- assertTrue(isSuccess, "Status should have been 'Success' but instead was: " + status)
+ def assertStatus(status: Status.Status, expected: Status.Status): Boolean = {
+ val isSuccess = status == expected
+ assertTrue(isSuccess, "Status should have been '" + expected + "' but instead was: " + status)
isSuccess
}
def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]): Boolean = {
- assertSuccess(status)
+ assertStatus(status, Success)
val isSuccess = Arrays.equals(expected, actual)
assertTrue(isSuccess)
isSuccess
}
+ def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]): Boolean = {
+ assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+ assertNull(actual)
+ status == KeyDoesNotExist
+ }
+
}
@Sharable
@@ -141,7 +157,7 @@
val status = Status.apply(buf.readUnsignedByte)
val resp: Response =
opCode match {
- case PutResponse => new Response(id, opCode, status)
+ case PutResponse | PutIfAbsentResponse => new Response(id, opCode, status)
case GetResponse => {
val value = {
status match {
More information about the infinispan-commits
mailing list