[infinispan-commits] Infinispan SVN: r2062 - in branches/4.1.x/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Jul 19 15:53:03 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-07-19 15:53:02 -0400 (Mon, 19 Jul 2010)
New Revision: 2062
Modified:
branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
Log:
[ISPN-516] (HotRod: improve protocol to be able to transfer the entire state of the cache) Added unit test and took advantage of Scala iterator take() method.
Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-07-19 13:44:38 UTC (rev 2061)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala 2010-07-19 19:53:02 UTC (rev 2062)
@@ -149,7 +149,7 @@
case BulkGetRequest => {
val count = buffer.readUnsignedInt
if (isTraceEnabled) trace("About to create bulk response, count = " + count)
- new BulkGetResponse(h.messageId, h.cacheName, h.clientIntel, BulkGetResponse, Success, h.topologyId, cache, count)
+ new BulkGetResponse(h.messageId, h.cacheName, h.clientIntel, BulkGetResponse, Success, h.topologyId, count)
}
}
}
Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-07-19 13:44:38 UTC (rev 2061)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala 2010-07-19 19:53:02 UTC (rev 2062)
@@ -8,8 +8,7 @@
import org.infinispan.Cache
import org.infinispan.server.core.{CacheValue, Logging}
import org.infinispan.util.ByteArrayKey
-import java.util.Iterator
-import org.infinispan.container.entries.{InternalCacheValue, InternalCacheEntry}
+import scala.collection.JavaConversions._
/**
* // TODO: Document this
@@ -53,22 +52,18 @@
}
}
case g: BulkGetResponse => {
- if (isTrace) trace("About to repond to bulk get request: ")
+ if (isTrace) trace("About to respond to bulk get request")
if (g.status == Success) {
- val dataContainer = g.cache.getAdvancedCache().getDataContainer()
- var iterator: Iterator[InternalCacheEntry] = dataContainer.iterator()
- val count = g.count
- var written:Int = 0;
- if (isTrace) trace("About to write (max) " + count + " messages to the client. Is written <= count ?" + (written <= count))
- while (iterator.hasNext() && ((written < count) || (count == 0)) ) {
- if (isTrace) trace("About to write message number " + written)
+ val cache: Cache[ByteArrayKey, CacheValue] = getCacheInstance(g.cacheName, cacheManager)
+ var iterator = asIterator(cache.entrySet.iterator)
+ if (g.count != 0) {
+ if (isTrace) trace("About to write (max) {0} messages to the client", g.count)
+ iterator = iterator.take(g.count)
+ }
+ for (entry <- iterator) {
buffer.writeByte(1) //not done
- written = written + 1
- var ice: InternalCacheEntry = iterator.next()
- val key:ByteArrayKey = ice.getKey().asInstanceOf[ByteArrayKey]
- buffer.writeRangedBytes(key.getData)
- val cacheValue : CacheValue = ice.getValue().asInstanceOf[CacheValue]
- buffer.writeRangedBytes(cacheValue.data)
+ buffer.writeRangedBytes(entry.getKey.getData)
+ buffer.writeRangedBytes(entry.getValue.data)
}
buffer.writeByte(0)
}
Modified: branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-07-19 13:44:38 UTC (rev 2061)
+++ branches/4.1.x/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Response.scala 2010-07-19 19:53:02 UTC (rev 2062)
@@ -55,8 +55,7 @@
}
class BulkGetResponse(override val messageId: Long, override val cacheName: String, override val clientIntel: Short,
override val operation: OperationResponse, override val status: OperationStatus,
- override val topologyId: Int,
- val cache: Cache[ByteArrayKey, CacheValue], val count: Int)
+ override val topologyId: Int, val count: Int)
extends Response(messageId, cacheName, clientIntel, operation, status, topologyId) {
override def toString = {
new StringBuilder().append("BulkGetResponse").append("{")
Modified: branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-07-19 13:44:38 UTC (rev 2061)
+++ branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-07-19 19:53:02 UTC (rev 2062)
@@ -9,7 +9,7 @@
import org.infinispan.server.core.CacheValue
import org.infinispan.server.hotrod.OperationStatus._
import org.infinispan.server.hotrod.test._
-import org.infinispan.util.ByteArrayKey
+import org.infinispan.util.{Util, ByteArrayKey}
/**
* Hot Rod server functional test.
@@ -356,5 +356,30 @@
assertStatus(resp.status, Success)
assertEquals(resp.topologyResponse, None)
}
-
+
+ def testBulkGet(m: Method) {
+ var size = 100
+ for (i <- 0 until size) {
+ val status = client.put(k(m, i + "k-") , 0, 0, v(m, i + "v-")).status
+ assertStatus(status, Success)
+ }
+ var resp = client.bulkGet
+ assertStatus(resp.status, Success)
+ var bulkData = resp.bulkData
+ assertEquals(size, bulkData.size)
+ for (i <- 0 until size)
+ assertTrue(Arrays.equals(bulkData.get(new ByteArrayKey(k(m, i + "k-"))).get, v(m, i + "v-")))
+
+ size = 50
+ resp = client.bulkGet(size)
+ assertStatus(resp.status, Success)
+ bulkData = resp.bulkData
+ assertEquals(size, bulkData.size)
+ for (i <- 0 until size) {
+ val key = new ByteArrayKey(k(m, i + "k-"))
+ if (bulkData.contains(key)) {
+ assertTrue(Arrays.equals(bulkData.get(key).get, v(m, i + "v-")))
+ }
+ }
+ }
}
\ No newline at end of file
Modified: branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-07-19 13:44:38 UTC (rev 2061)
+++ branches/4.1.x/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-07-19 19:53:02 UTC (rev 2062)
@@ -22,7 +22,7 @@
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.concurrent.atomic.{AtomicLong}
import org.infinispan.test.TestingUtil
-import org.infinispan.util.Util
+import org.infinispan.util.{ByteArrayKey, Util}
/**
* A very simply Hot Rod client for testing purpouses
@@ -34,7 +34,7 @@
* @author Galder Zamarreño
* @since 4.1
*/
-class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeoutSeconds: Int) {
+class HotRodClient(host: String, port: Int, defaultCacheName: String, rspTimeoutSeconds: Int) extends Logging {
val idToOp = new ConcurrentHashMap[Long, Op]
private lazy val ch: Channel = {
@@ -195,6 +195,15 @@
def ping(clientIntelligence: Byte, topologyId: Int): TestResponse =
execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, clientIntelligence, topologyId)
+ def bulkGet: TestBulkGetResponse = bulkGet(0)
+
+ def bulkGet(count: Int): TestBulkGetResponse = {
+ val op = new BulkGetOp(0xA0, 0x19, defaultCacheName, 1, 0, count)
+ val writeFuture = writeOp(op)
+ // Get the handler instance to retrieve the answer.
+ var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+ handler.getResponse(op.id).asInstanceOf[TestBulkGetResponse]
+ }
}
private class ClientPipelineFactory(client: HotRodClient, rspTimeoutSeconds: Int) extends ChannelPipelineFactory {
@@ -236,7 +245,7 @@
buffer.writeUnsignedInt(op.flags) // flags
buffer.writeByte(op.clientIntel) // client intelligence
buffer.writeUnsignedInt(op.topologyId) // topology id
- if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
+ if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17 && op.code != 0x19) { // if it's a key based op...
buffer.writeRangedBytes(op.key) // key length + key
if (op.value != null) {
if (op.code != 0x0D) { // If it's not removeIfUnmodified...
@@ -250,6 +259,8 @@
buffer.writeRangedBytes(op.value) // value length + value
}
}
+ } else if (op.code == 0x19) {
+ buffer.writeUnsignedInt(op.asInstanceOf[BulkGetOp].count) // Entry count
}
buffer.getUnderlyingChannelBuffer
}
@@ -349,6 +360,16 @@
new TestGetResponse(id, op.cacheName, op.clientIntel, opCode, status, op.topologyId, None, topologyChangeResponse)
}
}
+ case BulkGetResponse => {
+ var done = buf.readByte
+ val bulkBuffer = mutable.Map.empty[ByteArrayKey, Array[Byte]]
+ while (done == 1) {
+ bulkBuffer += (new ByteArrayKey(buf.readRangedBytes) -> buf.readRangedBytes)
+ done = buf.readByte
+ }
+ val bulk = immutable.Map[ByteArrayKey, Array[Byte]]() ++ bulkBuffer
+ new TestBulkGetResponse(id, op.cacheName, op.clientIntel, bulk, op.topologyId, topologyChangeResponse)
+ }
case ErrorResponse => {
if (op == null)
new TestErrorResponse(id, "", 0, status, 0, buf.readString, topologyChangeResponse)
@@ -445,6 +466,13 @@
override val topologyId: Int,
val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
+class BulkGetOp(override val magic: Int,
+ override val code: Byte,
+ override val cacheName: String,
+ override val clientIntel: Byte,
+ override val topologyId: Int,
+ val count: Int) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0, clientIntel, topologyId)
+
class TestResponse(override val messageId: Long, override val cacheName: String,
override val clientIntel: Short, override val operation: OperationResponse,
override val status: OperationStatus,
@@ -482,4 +510,9 @@
class TestStatsResponse(override val messageId: Long, override val cacheName: String,
override val clientIntel: Short, val stats: Map[String, String],
override val topologyId: Int, override val topologyResponse: Option[AbstractTopologyResponse])
- extends TestResponse(messageId, cacheName, clientIntel, StatsResponse, Success, topologyId, topologyResponse)
\ No newline at end of file
+ extends TestResponse(messageId, cacheName, clientIntel, StatsResponse, Success, topologyId, topologyResponse)
+
+class TestBulkGetResponse(override val messageId: Long, override val cacheName: String,
+ override val clientIntel: Short, val bulkData: Map[ByteArrayKey, Array[Byte]],
+ override val topologyId: Int, override val topologyResponse: Option[AbstractTopologyResponse])
+ extends TestResponse(messageId, cacheName, clientIntel, BulkGetResponse, Success, topologyId, topologyResponse)
\ No newline at end of file
More information about the infinispan-commits
mailing list