[infinispan-commits] Infinispan SVN: r1598 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Mar 11 06:56:07 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-11 06:56:06 -0500 (Thu, 11 Mar 2010)
New Revision: 1598

Modified:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protoco) putIfAbsent implemented.


Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Cache.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -9,4 +9,5 @@
 abstract class Cache {
    def put(c: StorageCommand): Response
    def get(c: RetrievalCommand): Response
+   def putIfAbsent(c: StorageCommand): Response
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -17,25 +17,37 @@
 
    override def put(c: StorageCommand): Response = {
       val cache = getCache(c.cacheName, c.flags)
-      val k = new Key(c.key)
-      val v = new Value(c.value)
       (c.lifespan, c.maxIdle) match {
-         case (0, 0) => cache.put(k, v)
-         case (x, 0) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
-         case (x, y) => cache.put(k, v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
+         case (0, 0) => cache.put(c.k, c.v)
+         case (x, 0) => cache.put(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
+         case (x, y) => cache.put(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
       }
       new Response(c.id, OpCodes.PutResponse, Status.Success)
    }
 
    override def get(c: RetrievalCommand): Response = {
       val cache = getCache(c.cacheName, c.flags)
-      val value = cache.get(new Key(c.key))
+      val value = cache.get(c.k)
       if (value != null)
          new RetrievalResponse(c.id, OpCodes.GetResponse, Status.Success, value.v)
       else
          new RetrievalResponse(c.id, OpCodes.GetResponse, Status.KeyDoesNotExist, null)
    }
 
+   override def putIfAbsent(c: StorageCommand): Response = {
+      val cache = getCache(c.cacheName, c.flags)
+      val prev =
+         (c.lifespan, c.maxIdle) match {
+            case (0, 0) => cache.putIfAbsent(c.k, c.v)
+            case (x, 0) => cache.putIfAbsent(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS)
+            case (x, y) => cache.putIfAbsent(c.k, c.v, toMillis(c.lifespan), TimeUnit.MILLISECONDS, c.maxIdle, TimeUnit.SECONDS)
+         }
+      if (prev == null)
+         new Response(c.id, OpCodes.PutIfAbsentResponse, Status.Success)
+      else
+         new Response(c.id, OpCodes.PutIfAbsentResponse, Status.OperationNotExecuted)
+   }
+
    private def getCache(cacheName: String, flags: Set[Flag]): InfinispanCache[Key, Value] = {
       val isDefaultCache = cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME
       val isWithFlags = ! flags.isEmpty

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -18,18 +18,20 @@
       val flags = Flags.toContextFlags(buffer.readUnsignedInt)
       val command: Command =
          op match {                                   
-            case PutRequest => {
-               val key = buffer.readRangedBytes
+            case PutRequest | PutIfAbsentRequest => {
+               val k = new Key(buffer.readRangedBytes)
                val lifespan = buffer.readUnsignedInt
                val maxIdle = buffer.readUnsignedInt
-               val value = buffer.readRangedBytes
-               new StorageCommand(cacheName, id, key, lifespan, maxIdle, value, flags)({
-                  (cache: Cache, command: StorageCommand) => cache.put(command)
-               })
+               val v = new Value(buffer.readRangedBytes)
+               val f = op match {
+                  case PutRequest => (cache: Cache, command: StorageCommand) => cache.put(command)
+                  case PutIfAbsentRequest => (cache: Cache, command: StorageCommand) => cache.putIfAbsent(command)
+               }
+               new StorageCommand(cacheName, id, k, lifespan, maxIdle, v, flags)(f)
             }
             case GetRequest => {
-               val key = buffer.readRangedBytes
-               new RetrievalCommand(cacheName, id, key, flags)({
+               val k =  new Key(buffer.readRangedBytes)
+               new RetrievalCommand(cacheName, id, k, flags)({
                   (cache: Cache, command: RetrievalCommand) => cache.get(command)
                })
             }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Encoder410.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -15,33 +15,26 @@
 
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Object) = {
       trace("Encode msg {0}", msg)
-//      try {
-         val buffer: ChannelBuffer =
-            msg match {
-               case r: Response => {
-                  val buffer = ctx.getChannelBuffers.dynamicBuffer
-                  buffer.writeByte(Magic.byteValue)
-                  buffer.writeUnsignedLong(r.id)
-                  buffer.writeByte(r.opCode.id.byteValue)
-                  buffer.writeByte(r.status.id.byteValue)
-                  buffer
-               }
-         }
+      val buffer: ChannelBuffer =
          msg match {
-            case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
-            case er: ErrorResponse => buffer.writeString(er.msg)
-            case _ => {
-               if (buffer == null)
-                  throw new IllegalArgumentException("Response received is unknown: " + msg);
+            case r: Response => {
+               val buffer = ctx.getChannelBuffers.dynamicBuffer
+               buffer.writeByte(Magic.byteValue)
+               buffer.writeUnsignedLong(r.id)
+               buffer.writeByte(r.opCode.id.byteValue)
+               buffer.writeByte(r.status.id.byteValue)
+               buffer
             }
+      }
+      msg match {
+         case rr: RetrievalResponse => if (rr.status == Status.Success) buffer.writeRangedBytes(rr.value)
+         case er: ErrorResponse => buffer.writeString(er.msg)
+         case _ => {
+            if (buffer == null)
+               throw new IllegalArgumentException("Response received is unknown: " + msg);
          }
-         buffer
-//      } catch {
-//         case t: Throwable => {
-//            val buffer = ctx.getChannelBuffers.dynamicBuffer
-//            buffer.writeByte(Magic.byteValue)
-//         }
-//      }
+      }
+      buffer
    }
    
 }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/GlobalDecoder.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -22,9 +22,8 @@
       val magic = buffer.readUnsignedByte()
       if (magic != Magic) {
          if (!isError) {
-            // throw new StreamCorruptedException("Magic byte incorrect: " + magic)
-            val m = "Error reading magic byte or message id: " + magic
-            return new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m)
+            val t = new StreamCorruptedException("Error reading magic byte or message id: " + magic)
+            return createErrorResponse(0, Status.InvalidMagicOrMsgId, t) 
          } else {
             trace("Error happened previously, ignoring {0} byte until we find the magic number again", magic)
             return null // Keep trying to read until we find magic
@@ -63,10 +62,7 @@
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-//      val t = e.getCause
-//      val m = if (t.getMessage != null) t.getMessage else "" 
       error("Unexpected error", e.getCause)
-//      e.getChannel.write(new ErrorResponse(0, OpCodes.ErrorResponse, Status.InvalidMagicOrMsgId, m))
    }
 
 }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OpCodes.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -13,6 +13,8 @@
    val PutResponse = Value(0x02)
    val GetRequest = Value(0x03)
    val GetResponse = Value(0x04)
+   val PutIfAbsentRequest = Value(0x05)
+   val PutIfAbsentResponse = Value(0x06)
 
    val ErrorResponse = Value(0x50)
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -10,7 +10,7 @@
 
 class RetrievalCommand(override val cacheName: String,
                        override val id: Long,
-                       val key: Array[Byte],
+                       val k: Key,
                        override val flags: Set[Flag])
                       (val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id, flags) {
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StatsCache.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -27,5 +27,10 @@
       // TODO: calculate stats if necessary
       super.get(c)
    }
-   
+
+   abstract override def putIfAbsent(c: StorageCommand): Response = {
+      // TODO: calculate stats if necessary
+      super.putIfAbsent(c)
+   }
+
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Status.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -10,6 +10,7 @@
    type Status = Value
 
    val Success = Value(0x00)
+   val OperationNotExecuted = Value(0x01)
    val KeyDoesNotExist = Value(0x02)
 
    val InvalidMagicOrMsgId = Value(0x81)

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -9,10 +9,10 @@
  */
 class StorageCommand(override val cacheName: String,
                      override val id: Long,
-                     val key: Array[Byte],
+                     val k: Key,
                      val lifespan: Int,
                      val maxIdle: Int,
-                     val value: Array[Byte],
+                     val v: Value,
                      override val flags: Set[Flag])
                     (val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id, flags) {
 
@@ -24,10 +24,10 @@
       new StringBuilder().append("StorageCommand").append("{")
          .append("cacheName=").append(cacheName)
          .append(", id=").append(id)
-         .append(", key=").append(key)
+         .append(", k=").append(k)
          .append(", lifespan=").append(lifespan)
          .append(", maxIdle=").append(maxIdle)
-         .append(", value=").append(value)
+         .append(", v=").append(v)
          .append(", flags=").append(flags)
          .append("}").toString
    }

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/ClusterTest.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -8,6 +8,8 @@
 import org.infinispan.manager.CacheManager
 import org.testng.annotations.{BeforeClass, AfterClass, Test}
 import org.infinispan.config.Configuration.CacheMode
+import org.infinispan.context.Flag
+import org.infinispan.server.hotrod.Status._
 
 /**
  * // TODO: Document this
@@ -40,12 +42,19 @@
       servers.foreach(_.stop)
    }
 
-   @Test
    def tesReplicatedPut(m: Method) {
       val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m))
-      assertSuccess(putSt)
+      assertStatus(putSt, Success)
       val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
       assertSuccess(getSt, v(m), actual)
    }
 
+   def tesLocalOnlyPut(m: Method) {
+      val putSt = put(channels.head, cacheName, k(m) , 0, 0, v(m), Set(Flag.CACHE_MODE_LOCAL))
+       assertStatus(putSt, Success)
+      val (getSt, actual) = get(channels.tail.head, cacheName, k(m), null)
+      assertKeyDoesNotExist(getSt, actual)
+   }
+
+   // todo: test multiple flags
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -52,23 +52,27 @@
    }
 
    def testUnknownCommand(m: Method) {
-      val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m))
-      assertSuccess(status)
+      val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), null)
+      assertTrue(status == UnknownCommand,
+         "Status should have been 'UnknownCommand' but instead was: " + status)
    }
 
    def testUnknownMagic(m: Method) {
       doPut(m) // Do a put to make sure decoder gets back to reading properly
-      val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m))
-      assertSuccess(status)
+      val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m), null)
+      assertTrue(status == InvalidMagicOrMsgId,
+         "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
    }
 
+   // todo: test other error conditions such as invalid version...etc
+
    def testPutBasic(m: Method) {
       doPut(m)
    }
 
    def testPutOnDefaultCache(m: Method) {
       val status = put(ch, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m) , 0, 0, v(m))
-      assertSuccess(status)
+      assertStatus(status, Success)
       val cache: InfinispanCache[Key, Value] = cacheManager.getCache[Key, Value]
       assertTrue(Arrays.equals(cache.get(new Key(k(m))).v, v(m)));
    }
@@ -98,6 +102,17 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
+   def testPutIfAbsentNotExist(m: Method) {
+      val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m))
+      assertStatus(status, Success)
+   }
+
+   def testPutIfAbsentExist(m: Method) {
+      doPut(m)
+      val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m, "v2-"))
+      assertStatus(status, OperationNotExecuted)
+   }
+
    
 
 // Invalid test since starting transactions does not make sense
@@ -132,19 +147,19 @@
 //      assertSuccess(status)
 //      assertTrue(Arrays.equals(expected, actual))
 //   }
+//
+//   private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
+//      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+//      assertNull(actual)
+//   }
 
-   private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
-      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
-      assertNull(actual)
-   }
-
    private def doPut(m: Method) {
       doPutWithLifespanMaxIdle(m, 0, 0)
    }
 
    private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
       val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
-      assertSuccess(status)
+      assertStatus(status, Success)
    }
 
    private def doGet(m: Method): (Status.Status, Array[Byte]) = {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-11 09:20:13 UTC (rev 1597)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-11 11:56:06 UTC (rev 1598)
@@ -47,11 +47,21 @@
    }
 
    def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
-      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v)
+      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, null)
    }
 
-   def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
-      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, null))
+   def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
+           flags: Set[Flag]): Status = {
+      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, flags)
+   }
+
+   def putIfAbsent(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
+      put(ch, 0xA0, 0x05, name, k, lifespan, maxIdle, v, null)
+   }
+
+   def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+           v: Array[Byte], flags: Set[Flag]): Status = {
+      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, flags))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -69,19 +79,25 @@
       (resp.status, resp.value)
    }
 
-   def assertSuccess(status: Status.Status): Boolean = {
-      val isSuccess = status == Success
-      assertTrue(isSuccess, "Status should have been 'Success' but instead was: " + status)
+   def assertStatus(status: Status.Status, expected: Status.Status): Boolean = {
+      val isSuccess = status == expected
+      assertTrue(isSuccess, "Status should have been '" + expected + "' but instead was: " + status)
       isSuccess
    }
 
    def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]): Boolean = {
-      assertSuccess(status)
+      assertStatus(status, Success)
       val isSuccess = Arrays.equals(expected, actual)
       assertTrue(isSuccess)
       isSuccess
    }
 
+   def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]): Boolean = {
+      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+      assertNull(actual)
+      status == KeyDoesNotExist
+   }
+
 }
 
 @Sharable
@@ -141,7 +157,7 @@
       val status = Status.apply(buf.readUnsignedByte)
       val resp: Response =
          opCode match {
-            case PutResponse => new Response(id, opCode, status)
+            case PutResponse | PutIfAbsentResponse => new Response(id, opCode, status)
             case GetResponse => {
                val value = {
                   status match {



More information about the infinispan-commits mailing list