[infinispan-commits] Infinispan SVN: r2063 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Jul 20 02:35:24 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-07-20 02:35:24 -0400 (Tue, 20 Jul 2010)
New Revision: 2063

Modified:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
Log:
[ISPN-516] (HotRod: improve protocol to be able to transfer the entire state of the cache) Added unit test and took advantage of Scala iterator take() method.

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-07-19 19:53:02 UTC (rev 2062)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-07-20 06:35:24 UTC (rev 2063)
@@ -149,7 +149,7 @@
          case BulkGetRequest => {
             val count = buffer.readUnsignedInt
             if (isTraceEnabled) trace("About to create bulk response, count = " + count)
-            new BulkGetResponse(h.messageId, h.cacheName, h.clientIntel, BulkGetResponse, Success, h.topologyId, cache, count)
+            new BulkGetResponse(h.messageId, h.cacheName, h.clientIntel, BulkGetResponse, Success, h.topologyId, count)
          }
       }
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-07-19 19:53:02 UTC (rev 2062)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-07-20 06:35:24 UTC (rev 2063)
@@ -8,8 +8,7 @@
 import org.infinispan.Cache
 import org.infinispan.server.core.{CacheValue, Logging}
 import org.infinispan.util.ByteArrayKey
-import java.util.Iterator
-import org.infinispan.container.entries.{InternalCacheValue, InternalCacheEntry}
+import scala.collection.JavaConversions._
 
 /**
  * // TODO: Document this
@@ -53,22 +52,18 @@
             }
          }
          case g: BulkGetResponse => {
-            if (isTrace) trace("About to repond to bulk get request: ")
+            if (isTrace) trace("About to respond to bulk get request")
             if (g.status == Success) {
-               val dataContainer = g.cache.getAdvancedCache().getDataContainer()
-               var iterator: Iterator[InternalCacheEntry] = dataContainer.iterator()
-               val count = g.count
-               var written:Int = 0;
-               if (isTrace) trace("About to write (max) " + count + " messages to the client. Is written <= count ?" + (written <= count))
-               while (iterator.hasNext() && ((written < count) || (count == 0)) ) {
-                  if (isTrace) trace("About to write message number " + written)
+               val cache: Cache[ByteArrayKey, CacheValue] = getCacheInstance(g.cacheName, cacheManager)
+               var iterator = asIterator(cache.entrySet.iterator)
+               if (g.count != 0) {
+                  if (isTrace) trace("About to write (max) {0} messages to the client", g.count)
+                  iterator = iterator.take(g.count)
+               }
+               for (entry <- iterator) {
                   buffer.writeByte(1) //not done
-                  written = written + 1
-                  var ice: InternalCacheEntry = iterator.next()
-                  val key:ByteArrayKey = ice.getKey().asInstanceOf[ByteArrayKey]
-                  buffer.writeRangedBytes(key.getData)
-                  val cacheValue : CacheValue = ice.getValue().asInstanceOf[CacheValue]
-                  buffer.writeRangedBytes(cacheValue.data)
+                  buffer.writeRangedBytes(entry.getKey.getData)
+                  buffer.writeRangedBytes(entry.getValue.data)
                }
                buffer.writeByte(0)
             }
@@ -190,4 +185,4 @@
 
 object HotRodEncoder extends Logging {
    private val Magic = 0xA1
-}
\ No newline at end of file
+}

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-07-19 19:53:02 UTC (rev 2062)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala	2010-07-20 06:35:24 UTC (rev 2063)
@@ -55,8 +55,7 @@
 }
 class BulkGetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
                   override val operation: OperationResponse, override val status: OperationStatus,
-                  override val topologyId: Int,
-                  val cache: Cache[ByteArrayKey, CacheValue], val count: Int)
+                  override val topologyId: Int, val count: Int)
       extends Response(messageId, cacheName, clientIntel, operation, status, topologyId) {
    override def toString = {
       new StringBuilder().append("BulkGetResponse").append("{")

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-07-19 19:53:02 UTC (rev 2062)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-07-20 06:35:24 UTC (rev 2063)
@@ -9,7 +9,7 @@
 import org.infinispan.server.core.CacheValue
 import org.infinispan.server.hotrod.OperationStatus._
 import org.infinispan.server.hotrod.test._
-import org.infinispan.util.ByteArrayKey
+import org.infinispan.util.{Util, ByteArrayKey}
 
 /**
  * Hot Rod server functional test.
@@ -356,5 +356,30 @@
       assertStatus(resp.status, Success)
       assertEquals(resp.topologyResponse, None)
    }
-   
-}
+
+   def testBulkGet(m: Method) {
+      var size = 100
+      for (i <- 0 until size) {
+         val status = client.put(k(m, i + "k-") , 0, 0, v(m, i + "v-")).status
+         assertStatus(status, Success)
+      }
+      var resp = client.bulkGet
+      assertStatus(resp.status, Success)
+      var bulkData = resp.bulkData
+      assertEquals(size, bulkData.size)
+      for (i <- 0 until size)
+         assertTrue(Arrays.equals(bulkData.get(new ByteArrayKey(k(m, i + "k-"))).get, v(m, i + "v-")))
+
+      size = 50
+      resp = client.bulkGet(size)
+      assertStatus(resp.status, Success)
+      bulkData = resp.bulkData
+      assertEquals(size, bulkData.size)
+      for (i <- 0 until size) {
+         val key = new ByteArrayKey(k(m, i + "k-"))
+         if (bulkData.contains(key)) {
+            assertTrue(Arrays.equals(bulkData.get(key).get, v(m, i + "v-")))            
+         }
+      }
+   }
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-07-19 19:53:02 UTC (rev 2062)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-07-20 06:35:24 UTC (rev 2063)
@@ -22,7 +22,7 @@
 import java.util.concurrent.{ConcurrentHashMap, Executors}
 import java.util.concurrent.atomic.{AtomicLong}
 import org.infinispan.test.TestingUtil
-import org.infinispan.util.Util
+import org.infinispan.util.{ByteArrayKey, Util}
 
 /**
  * A very simply Hot Rod client for testing purpouses
@@ -34,8 +34,8 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeoutSeconds: Int) {
-   val idToOp = new ConcurrentHashMap[Long, Op]    
+class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeoutSeconds: Int) extends Logging {
+   val idToOp = new ConcurrentHashMap[Long, Op]
 
    private lazy val ch: Channel = {
       val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
@@ -50,7 +50,7 @@
       assertTrue(connectFuture.isSuccess)
       ch
    }
-   
+
    def stop = ch.disconnect
 
    def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
@@ -90,12 +90,12 @@
 
    def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
       execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
-   
+
    def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): TestResponse =
       execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 1 ,0)
 
    def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): TestResponse =
-      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)   
+      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, flags)
 
    def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): TestResponse =
       execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, version, 1 ,0)
@@ -195,6 +195,15 @@
    def ping(clientIntelligence: Byte, topologyId: Int): TestResponse =
       execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, clientIntelligence, topologyId)
 
+   def bulkGet: TestBulkGetResponse = bulkGet(0)
+
+   def bulkGet(count: Int): TestBulkGetResponse = {
+      val op = new BulkGetOp(0xA0, 0x19, defaultCacheName, 1, 0, count)
+      val writeFuture = writeOp(op)
+      // Get the handler instance to retrieve the answer.
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      handler.getResponse(op.id).asInstanceOf[TestBulkGetResponse]
+   }
 }
 
 private class ClientPipelineFactory(client: HotRodClient, rspTimeoutSeconds: Int) extends ChannelPipelineFactory {
@@ -236,7 +245,7 @@
             buffer.writeUnsignedInt(op.flags) // flags
             buffer.writeByte(op.clientIntel) // client intelligence
             buffer.writeUnsignedInt(op.topologyId) // topology id
-            if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
+            if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17 && op.code != 0x19) { // if it's a key based op...
                buffer.writeRangedBytes(op.key) // key length + key
                if (op.value != null) {
                   if (op.code != 0x0D) { // If it's not removeIfUnmodified...
@@ -250,6 +259,8 @@
                      buffer.writeRangedBytes(op.value) // value length + value
                   }
                }
+            } else if (op.code == 0x19) {
+               buffer.writeUnsignedInt(op.asInstanceOf[BulkGetOp].count) // Entry count
             }
             buffer.getUnderlyingChannelBuffer
          }
@@ -349,6 +360,16 @@
                new TestGetResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, None, topologyChangeResponse)
             }
          }
+         case BulkGetResponse => {
+            var done = buf.readByte
+            val bulkBuffer = mutable.Map.empty[ByteArrayKey, Array[Byte]]
+            while (done == 1) {
+               bulkBuffer += (new ByteArrayKey(buf.readRangedBytes) -> buf.readRangedBytes)
+               done = buf.readByte
+            }
+            val bulk = immutable.Map[ByteArrayKey, Array[Byte]]() ++ bulkBuffer
+            new TestBulkGetResponse(id, op.cacheName, op.clientIntel, bulk, op.topologyId, topologyChangeResponse)
+         }
          case ErrorResponse => {
             if (op == null)
                new TestErrorResponse(id, "", 0, status, 0, buf.readString, topologyChangeResponse)
@@ -445,6 +466,13 @@
               override val topologyId: Int,
               val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
 
+class BulkGetOp(override val magic: Int,
+              override val code: Byte,
+              override val cacheName: String,
+              override val clientIntel: Byte,
+              override val topologyId: Int,
+              val count: Int) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
+
 class TestResponse(override val messageId: Long, override val cacheName: String,
                    override val clientIntel: Short, override val operation: OperationResponse,
                    override val status: OperationStatus,
@@ -483,3 +511,8 @@
                         override val clientIntel: Short, val stats: Map[String, String],
                         override val topologyId: Int, override val topologyResponse: Option[AbstractTopologyResponse])
       extends TestResponse(messageId, cacheName, clientIntel, StatsResponse, Success, topologyId, topologyResponse)
+
+class TestBulkGetResponse(override val messageId: Long, override val cacheName: String,
+                          override val clientIntel: Short, val bulkData: Map[ByteArrayKey, Array[Byte]],
+                          override val topologyId: Int, override val topologyResponse: Option[AbstractTopologyResponse])
+      extends TestResponse(messageId, cacheName, clientIntel, BulkGetResponse, Success, topologyId, topologyResponse)
\ No newline at end of file



More information about the infinispan-commits mailing list