[infinispan-commits] Infinispan SVN: r1628 - in trunk/server/hotrod/src: test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Mar 26 11:41:16 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-26 11:41:13 -0400 (Fri, 26 Mar 2010)
New Revision: 1628

Added:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala
Removed:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
Modified:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Added more exception case statements. Fixed removeIfUmodified request handling. Tidied up unit testing so that it can be easier to add more tests.

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -41,26 +41,28 @@
    override def readKeys(buffer: ChannelBuffer): Array[CacheKey] = Array(new CacheKey(buffer.readRangedBytes))
 
    override def readParameters(header: HotRodHeader, buffer: ChannelBuffer): Option[RequestParameters] = {
-      if (header.op != RemoveRequest) {
-         val lifespan = {
-            val streamLifespan = buffer.readUnsignedInt
-            if (streamLifespan <= 0) -1 else streamLifespan
+      header.op match {
+         case RemoveRequest => None
+         case RemoveIfUnmodifiedRequest => Some(new RequestParameters(null, -1, -1, buffer.readLong))
+         case ReplaceIfUnmodifiedRequest => {
+            val lifespan = readLifespanOrMaxIdle(buffer)
+            val maxIdle = readLifespanOrMaxIdle(buffer)
+            val version = buffer.readLong
+            Some(new RequestParameters(buffer.readRangedBytes, lifespan, maxIdle, version))
          }
-         val maxIdle = {
-            val streamMaxIdle = buffer.readUnsignedInt
-            if (streamMaxIdle <= 0) -1 else streamMaxIdle
+         case _ => {
+            val lifespan = readLifespanOrMaxIdle(buffer)
+            val maxIdle = readLifespanOrMaxIdle(buffer)
+            Some(new RequestParameters(buffer.readRangedBytes, lifespan, maxIdle, -1))
          }
-         val version = header.op match {
-            case ReplaceIfUnmodifiedRequest | RemoveIfUnmodifiedRequest => buffer.readLong
-            case _ => -1
-         }
-         val data = buffer.readRangedBytes
-         Some(new RequestParameters(data, lifespan, maxIdle, version))
-      } else {
-         None
       }
    }
 
+   private def readLifespanOrMaxIdle(buffer: ChannelBuffer): Int = {
+      val stream = buffer.readUnsignedInt
+      if (stream <= 0) -1 else stream
+   }
+
    override def createValue(params: RequestParameters, nextVersion: Long): CacheValue =
       new CacheValue(params.data, nextVersion)
 
@@ -131,7 +133,6 @@
       stats += ("misses" -> cacheStats.getMisses.toString)
       stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
       stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
-      stats += ("evictions" -> cacheStats.getEvictions.toString)
       new StatsResponse(header.messageId, immutable.Map[String, String]() ++ stats)
    }
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -2,11 +2,12 @@
 
 import org.infinispan.Cache
 import org.infinispan.stats.Stats
-import java.io.StreamCorruptedException
 import org.infinispan.server.core._
 import transport._
 import OperationStatus._
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import java.io.{IOException, StreamCorruptedException}
+import org.infinispan.util.concurrent.TimeoutException
 
 /**
  * // TODO: Document this
@@ -104,10 +105,11 @@
          case se: ServerException => {
             val messageId = se.header.asInstanceOf[HotRodHeader].messageId
             se.getCause match {
-               case imie: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, imie.toString)
-               case uoe: UnknownOperationException => new ErrorResponse(messageId, UnknownOperation, uoe.toString)
-               case uve: UnknownVersionException => new ErrorResponse(messageId, UnknownVersion, uve.toString)
-               // TODO add more cases
+               case i: InvalidMagicIdException => new ErrorResponse(0, InvalidMagicOrMsgId, i.toString)
+               case u: UnknownOperationException => new ErrorResponse(messageId, UnknownOperation, u.toString)
+               case u: UnknownVersionException => new ErrorResponse(messageId, UnknownVersion, u.toString)
+               case i: IOException => new ErrorResponse(messageId, ParseError, i.toString)
+               case t: TimeoutException => new ErrorResponse(messageId, OperationTimedOut, t.toString)
                case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
             }
          }

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -3,10 +3,10 @@
 import org.infinispan.test.fwk.TestCacheManagerFactory
 import org.testng.annotations.{AfterClass, Test}
 import java.lang.reflect.Method
-import test.{Client, Utils}
+import test.HotRodClient
+import test.HotRodTestingUtil._
 import org.testng.Assert._
 import java.util.Arrays
-import org.jboss.netty.channel.Channel
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
 import org.infinispan.{AdvancedCache}
 import org.infinispan.test.{SingleCacheManagerTest}
@@ -14,7 +14,7 @@
 import org.infinispan.server.hotrod.OperationStatus._
 
 /**
- * TODO: Document
+ * Hot Rod server functional test.
  *
  * Note: It appears that optional parameters in annotations result in compiler errors.
  * This has been solved in Scala 2.8.0.Beta1, so use that compiler,
@@ -27,17 +27,17 @@
  * @since 4.1
  */
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodFunctionalTest")
-class HotRodFunctionalTest extends SingleCacheManagerTest with Utils with Client {
+class HotRodFunctionalTest extends SingleCacheManagerTest {
    private val cacheName = "hotrod-cache"
    private var server: HotRodServer = _
-   private var ch: Channel = _
+   private var client: HotRodClient = _
    private var advancedCache: AdvancedCache[CacheKey, CacheValue] = _
 
    override def createCacheManager: CacheManager = {
       val cacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
       advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
       server = startHotRodServer(cacheManager)
-      ch = connect("127.0.0.1", server.getPort)
+      client = new HotRodClient("127.0.0.1", server.getPort, cacheName)
       cacheManager
    }
 
@@ -45,19 +45,19 @@
    override def destroyAfterClass {
       super.destroyAfterClass
       log.debug("Test finished, close client and Hot Rod server", null)
-      ch.disconnect
+      client.stop
       server.stop
    }
 
    def testUnknownCommand(m: Method) {
-      val status = put(ch, 0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 0)
+      val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 0)
       assertTrue(status == UnknownOperation,
          "Status should have been 'UnknownOperation' but instead was: " + status)
    }
 
    def testUnknownMagic(m: Method) {
-      doPut(m) // Do a put to make sure decoder gets back to reading properly
-      val status = put(ch, 0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
+      client.assertPut(m) // Do a put to make sure decoder gets back to reading properly
+      val status = client.execute(0x66, 0x01, cacheName, k(m) , 0, 0, v(m), 0, 0)
       assertTrue(status == InvalidMagicOrMsgId,
          "Status should have been 'InvalidMagicOrMsgId' but instead was: " + status)
    }
@@ -66,20 +66,11 @@
    // todo: add test for force return value operation
 
    def testPutBasic(m: Method) {
-      doPut(m)
+      client.assertPut(m)
    }
 
-   private def doPut(m: Method) {
-      doPutWithLifespanMaxIdle(m, 0, 0)
-   }
-
-   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
-      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
-      assertStatus(status, Success)
-   }
-
    def testPutOnDefaultCache(m: Method) {
-      val status = put(ch, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m) , 0, 0, v(m))
+      val status = client.execute(0xA0, 0x01, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m), 0, 0, v(m), 0, 0)
       assertStatus(status, Success)
       val cache = cacheManager.getCache[CacheKey, CacheValue]
       val value = cache.get(new CacheKey(k(m)))
@@ -87,196 +78,196 @@
    }
 
    def testPutWithLifespan(m: Method) {
-      doPutWithLifespanMaxIdle(m, 1, 0)
+      client.assertPut(m, 1, 0)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testPutWithMaxIdle(m: Method) {
-      doPutWithLifespanMaxIdle(m, 0, 1)
+      client.assertPut(m, 0, 1)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testGetBasic(m: Method) {
-      doPut(m)
-      val (getSt, actual) = doGet(m)
+      client.assertPut(m)
+      val (getSt, actual) = client.assertGet(m)
       assertSuccess(getSt, v(m), actual)
    }
 
    def testGetDoesNotExist(m: Method) {
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testPutIfAbsentNotExist(m: Method) {
-      val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m))
+      val status = client.putIfAbsent(k(m) , 0, 0, v(m))
       assertStatus(status, Success)
    }
 
    def testPutIfAbsentExist(m: Method) {
-      doPut(m)
-      val status = putIfAbsent(ch, cacheName, k(m) , 0, 0, v(m, "v2-"))
+      client.assertPut(m)
+      val status = client.putIfAbsent(k(m) , 0, 0, v(m, "v2-"))
       assertStatus(status, OperationNotExecuted)
    }
 
    def testPutIfAbsentWithLifespan(m: Method) {
-      val status = putIfAbsent(ch, cacheName, k(m) , 1, 0, v(m))
+      val status = client.putIfAbsent(k(m) , 1, 0, v(m))
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testPutIfAbsentWithMaxIdle(m: Method) {
-      val status = putIfAbsent(ch, cacheName, k(m) , 0, 1, v(m))
+      val status = client.putIfAbsent(k(m) , 0, 1, v(m))
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testReplaceBasic(m: Method) {
-      doPut(m)
-      val status = replace(ch, cacheName, k(m), 0, 0, v(m, "v1-"))
+      client.assertPut(m)
+      val status = client.replace(k(m), 0, 0, v(m, "v1-"))
       assertStatus(status, Success)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertSuccess(getSt, v(m, "v1-"), actual)
    }
 
    def testNotReplaceIfNotPresent(m: Method) {
-      val status = replace(ch, cacheName, k(m), 0, 0, v(m))
+      val status = client.replace(k(m), 0, 0, v(m))
       assertStatus(status, OperationNotExecuted)
    }
 
    def testReplaceWithLifespan(m: Method) {
-      doPut(m)
-      val status = replace(ch, cacheName, k(m), 1, 0, v(m, "v1-"))
+      client.assertPut(m)
+      val status = client.replace(k(m), 1, 0, v(m, "v1-"))
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testReplaceWithMaxIdle(m: Method) {
-      doPut(m)
-      val status = replace(ch, cacheName, k(m), 0, 1, v(m, "v1-"))
+      client.assertPut(m)
+      val status = client.replace(k(m), 0, 1, v(m, "v1-"))
       assertStatus(status, Success)
       Thread.sleep(1100)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testGetWithVersionBasic(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
    }
 
    def testGetWithVersionDoesNotExist(m: Method) {
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertKeyDoesNotExist(getSt, actual)
       assertTrue(version == 0)
    }
 
    def testReplaceIfUnmodifiedBasic(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      val status = replaceIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v1-"), version)
+      val status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
       assertStatus(status, Success)
    }
 
    def testReplaceIfUnmodifiedNotFound(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      val status = replaceIfUnmodified(ch, cacheName, k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+      val status = client.replaceIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
       assertStatus(status, KeyDoesNotExist)
    }
 
    def testReplaceIfUnmodifiedNotExecuted(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      var status = replaceIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v1-"), version)
+      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
       assertStatus(status, Success)
-      val (getSt2, actual2, version2) = getWithVersion(ch, cacheName, k(m), 0)
+      val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt2, v(m, "v1-"), actual2)
       assertTrue(version2 != 0)
       assertTrue(version != version2)
-      status = replaceIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v2-"), version)
+      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
       assertStatus(status, OperationNotExecuted)
-      status = replaceIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v2-"), version2)
+      status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
       assertStatus(status, Success)
    }
 
    def testRemoveBasic(m: Method) {
-      doPut(m)
-      val status = remove(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val status = client.remove(k(m), 0)
       assertStatus(status, Success)
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testRemoveDoesNotExist(m: Method) {
-      val status = remove(ch, cacheName, k(m), 0)
+      val status = client.remove(k(m), 0)
       assertStatus(status, KeyDoesNotExist)
    }
 
    def testRemoveIfUnmodifiedBasic(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      val status = removeIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v1-"), version)
+      val status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
       assertStatus(status, Success)
-      val (getSt2, actual2) = doGet(m)
+      val (getSt2, actual2) = client.assertGet(m)
       assertKeyDoesNotExist(getSt2, actual2)
    }
 
    def testRemoveIfUnmodifiedNotFound(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      val status = removeIfUnmodified(ch, cacheName, k(m, "k1-"), 0, 0, v(m, "v1-"), version)
+      val status = client.removeIfUnmodified(k(m, "k1-"), 0, 0, v(m, "v1-"), version)
       assertStatus(status, KeyDoesNotExist)
-      val (getSt2, actual2) = doGet(m)
+      val (getSt2, actual2) = client.assertGet(m)
       assertSuccess(getSt2, v(m), actual2)
    }
 
    def testRemoveIfUnmodifiedNotExecuted(m: Method) {
-      doPut(m)
-      val (getSt, actual, version) = getWithVersion(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val (getSt, actual, version) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt, v(m), actual)
       assertTrue(version != 0)
-      var status = replaceIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v1-"), version)
+      var status = client.replaceIfUnmodified(k(m), 0, 0, v(m, "v1-"), version)
       assertStatus(status, Success)
-      val (getSt2, actual2, version2) = getWithVersion(ch, cacheName, k(m), 0)
+      val (getSt2, actual2, version2) = client.getWithVersion(k(m), 0)
       assertSuccess(getSt2, v(m, "v1-"), actual2)
       assertTrue(version2 != 0)
       assertTrue(version != version2)
-      status = removeIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v2-"), version)
+      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version)
       assertStatus(status, OperationNotExecuted)
-      status = removeIfUnmodified(ch, cacheName, k(m), 0, 0, v(m, "v2-"), version2)
+      status = client.removeIfUnmodified(k(m), 0, 0, v(m, "v2-"), version2)
       assertStatus(status, Success)
    }
 
    def testContainsKeyBasic(m: Method) {
-      doPut(m)
-      val status = containsKey(ch, cacheName, k(m), 0)
+      client.assertPut(m)
+      val status = client.containsKey(k(m), 0)
       assertStatus(status, Success)
    }
 
    def testContainsKeyDoesNotExist(m: Method) {
-      val status = containsKey(ch, cacheName, k(m), 0)
+      val status = client.containsKey(k(m), 0)
       assertStatus(status, KeyDoesNotExist)
    }
 
@@ -284,24 +275,24 @@
       for (i <- 1 to 5) {
          val key = k(m, "k" + i + "-");
          val value = v(m, "v" + i + "-");
-         var status = put(ch, cacheName, key , 0, 0, value)
+         var status = client.put(key , 0, 0, value)
          assertStatus(status, Success)
-         status = containsKey(ch, cacheName, key, 0)
+         status = client.containsKey(key, 0)
          assertStatus(status, Success)
       }
 
-      val status = clear(ch, cacheName)
+      val status = client.clear
       assertStatus(status, Success)
 
       for (i <- 1 to 5) {
          val key = k(m, "k" + i + "-")
-         val status = containsKey(ch, cacheName, key, 0)
+         val status = client.containsKey(key, 0)
          assertStatus(status, KeyDoesNotExist)
       }
    }
 
    def testStatsDisabled(m: Method) {
-      val s = stats(ch, cacheName)
+      val s = client.stats
       assertEquals(s.get("timeSinceStart").get, "-1")
       assertEquals(s.get("currentNumberOfEntries").get, "-1")
       assertEquals(s.get("totalNumberOfEntries").get, "-1")
@@ -311,20 +302,11 @@
       assertEquals(s.get("misses").get, "-1")
       assertEquals(s.get("removeHits").get, "-1")
       assertEquals(s.get("removeMisses").get, "-1")
-      assertEquals(s.get("evictions").get, "-1")
    }
 
    def testPing(m: Method) {
-      val status = ping(ch, cacheName)
+      val status = client.ping
       assertStatus(status, Success)
    }
 
-   private def doGet(m: Method): (OperationStatus, Array[Byte]) = {
-      doGet(m, 0)
-   }
-
-   private def doGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = {
-      get(ch, cacheName, k(m), flags)
-   }
-
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -1,27 +1,27 @@
 package org.infinispan.server.hotrod
 
-import test.{Utils, Client}
+import test.HotRodClient
+import test.HotRodTestingUtil._
 import org.infinispan.test.SingleCacheManagerTest
 import org.testng.annotations.{AfterClass, Test}
 import org.infinispan.test.fwk.TestCacheManagerFactory
 import org.infinispan.server.core.CacheValue
 import org.infinispan.AdvancedCache
-import org.jboss.netty.channel.Channel
 import java.lang.reflect.Method
 import org.testng.Assert._
 import org.infinispan.server.hotrod.OperationStatus._
-import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import org.infinispan.manager.CacheManager
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
- * @since
+ * @since 4.1
  */
 @Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
-class HotRodStatsTest extends SingleCacheManagerTest with Utils with Client {
+class HotRodStatsTest extends SingleCacheManagerTest {
    private val cacheName = "hotrod-cache"
    private var server: HotRodServer = _
-   private var ch: Channel = _
+   private var client: HotRodClient = _
    private var advancedCache: AdvancedCache[CacheKey, CacheValue] = _
    private var jmxDomain = classOf[HotRodStatsTest].getSimpleName
 
@@ -29,7 +29,7 @@
       val cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
       advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
       server = startHotRodServer(cacheManager)
-      ch = connect("127.0.0.1", server.getPort)
+      client = new HotRodClient("127.0.0.1", server.getPort, cacheName)
       cacheManager
    }
 
@@ -37,12 +37,12 @@
    override def destroyAfterClass {
       super.destroyAfterClass
       log.debug("Test finished, close client and Hot Rod server", null)
-      ch.disconnect
+      client.stop
       server.stop
    }
 
    def testStats(m: Method) {
-      var s = stats(ch, cacheName)
+      var s = client.stats
       assertTrue(s.get("timeSinceStart") != 0)
       assertEquals(s.get("currentNumberOfEntries").get, "0")
       assertEquals(s.get("totalNumberOfEntries").get, "0")
@@ -52,37 +52,18 @@
       assertEquals(s.get("misses").get, "0")
       assertEquals(s.get("removeHits").get, "0")
       assertEquals(s.get("removeMisses").get, "0")
-      assertEquals(s.get("evictions").get, "0")
 
-      doPut(m)
-      s = stats(ch, cacheName)
+      client.assertPut(m)
+      s = client.stats
       assertEquals(s.get("currentNumberOfEntries").get, "1")
       assertEquals(s.get("totalNumberOfEntries").get, "1")
       assertEquals(s.get("stores").get, "1")
-      val (getSt, actual) = doGet(m)
+      val (getSt, actual) = client.assertGet(m)
       assertSuccess(getSt, v(m), actual)
-      s = stats(ch, cacheName)
+      s = client.stats
       assertEquals(s.get("hits").get, "1")
       assertEquals(s.get("misses").get, "0")
       assertEquals(s.get("retrievals").get, "1")
    }
 
-   // TODO: shared this private between tests by making client trait and object instead
-   private def doPut(m: Method) {
-      doPutWithLifespanMaxIdle(m, 0, 0)
-   }
-
-   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
-      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
-      assertStatus(status, Success)
-   }
-
-   private def doGet(m: Method): (OperationStatus, Array[Byte]) = {
-      doGet(m, 0)
-   }
-
-   private def doGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = {
-      get(ch, cacheName, k(m), flags)
-   }
-   
 }
\ No newline at end of file

Deleted: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -1,308 +0,0 @@
-package org.infinispan.server.hotrod.test
-
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
-import org.jboss.netty.bootstrap.ClientBootstrap
-import java.net.InetSocketAddress
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
-import org.jboss.netty.channel._
-import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
-import org.jboss.netty.handler.codec.replay.ReplayingDecoder
-import org.testng.Assert._
-import org.infinispan.server.hotrod._
-import org.infinispan.server.hotrod.Response
-import org.infinispan.server.hotrod.OperationStatus._
-import org.infinispan.server.hotrod.OperationResponse._
-import java.util.concurrent.atomic.AtomicInteger
-import org.infinispan.server.core.transport.NoState
-import org.jboss.netty.channel.ChannelHandler.Sharable
-import java.util.Arrays
-import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
-import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
-import org.infinispan.server.core.Logging
-import collection.mutable
-import collection.immutable
-
-/**
- * // TODO: Document this
- *
- * // TODO: Transform to Netty independent code
- * // TODO: maybe make it an object to be able to cache stuff without testng complaining
- *
- * @author Galder Zamarreño
- * @since 4.1
- */
-trait Client {
-
-   def connect(host: String, port: Int): Channel = {
-      // Set up.
-      val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
-      val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
-      bootstrap.setPipelineFactory(ClientPipelineFactory)
-      bootstrap.setOption("tcpNoDelay", true)
-      bootstrap.setOption("keepAlive", true)
-      // Make a new connection.
-      val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
-      // Wait until the connection is made successfully.
-      val ch = connectFuture.awaitUninterruptibly.getChannel
-      // Ideally, I'd store channel as a var in this trait. However, this causes issues with TestNG, see:
-      // http://thread.gmane.org/gmane.comp.lang.scala.user/24317
-      assertTrue(connectFuture.isSuccess)
-      ch
-   }
-
-   def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus = {
-      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, 0, 0)
-   }
-
-   def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
-           flags: Int): OperationStatus = {
-      put(ch, 0xA0, 0x01, name, k, lifespan, maxIdle, v, flags, 0)
-   }
-
-   def putIfAbsent(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus = {
-      put(ch, 0xA0, 0x05, name, k, lifespan, maxIdle, v, 0, 0)
-   }
-
-   def replace(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus = {
-      put(ch, 0xA0, 0x07, name, k, lifespan, maxIdle, v, 0, 0)
-   }
-
-   def replaceIfUnmodified(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
-                           version: Long): OperationStatus = {
-      put(ch, 0xA0, 0x09, name, k, lifespan, maxIdle, v, 0, version)
-   }
-
-   def removeIfUnmodified(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte],
-                           version: Long): OperationStatus = {
-      put(ch, 0xA0, 0x0D, name, k, lifespan, maxIdle, v, 0, version)
-   }
-
-   // TODO: change name of this method since it's more polivalent than just a put
-   def put(ch: Channel, magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
-           v: Array[Byte], flags: Int, version: Long): OperationStatus = {
-      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version))
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
-      // Get the handler instance to retrieve the answer.
-      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      handler.getResponse.status
-   }
-
-   def get(ch: Channel, name: String, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
-      val (getSt, actual, version) = get(ch, 0x03, name, k, 0)
-      (getSt, actual)
-   }
-
-   def containsKey(ch: Channel, name: String, k: Array[Byte], flags: Int): OperationStatus = {
-      val (containsKeySt, actual, version) = get(ch, 0x0F, name, k, 0)
-      containsKeySt
-   }
-
-   def getWithVersion(ch: Channel, name: String, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
-      get(ch, 0x11, name, k, 0)
-   }
-
-   def remove(ch: Channel, name: String, k: Array[Byte], flags: Int): OperationStatus = {
-      put(ch, 0xA0, 0x0B, name, k, 0, 0, null, 0, 0)
-   }
-
-   def get(ch: Channel, code: Byte, name: String, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
-      val writeFuture = ch.write(new Op(0xA0, code, name, k, 0, 0, null, flags, 0))
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
-      // Get the handler instance to retrieve the answer.
-      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      if (code == 0x03) {
-         val resp = handler.getResponse.asInstanceOf[GetResponse]
-         (resp.status, if (resp.data == None) null else resp.data.get, 0)
-      } else if (code == 0x11) {
-         val resp = handler.getResponse.asInstanceOf[GetWithVersionResponse]
-         (resp.status, if (resp.data == None) null else resp.data.get, resp.version)
-      } else if (code == 0x0F) {
-         (handler.getResponse.status, null, 0)
-      } else {
-         (OperationNotExecuted, null, 0)
-      }
-   }
-
-   def clear(ch: Channel, name: String): OperationStatus = {
-      put(ch, 0xA0, 0x13, name, null, 0, 0, null, 0, 0)
-   }
-
-   def stats(ch: Channel, name: String): Map[String, String] = {
-      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, name, null))
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
-      // Get the handler instance to retrieve the answer.
-      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-      val resp = handler.getResponse.asInstanceOf[StatsResponse]
-      resp.stats
-   }
-
-//   def stats(ch: Channel, name: String, statName: String): Map[String, String] = {
-//      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, name, statName))
-//      writeFuture.awaitUninterruptibly
-//      assertTrue(writeFuture.isSuccess)
-//      // Get the handler instance to retrieve the answer.
-//      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-//      val resp = handler.getResponse.asInstanceOf[StatsResponse]
-//      resp.stats
-//   }
-
-   def ping(ch: Channel, name: String): OperationStatus = {
-      put(ch, 0xA0, 0x17, name, null, 0, 0, null, 0, 0)
-   }
-
-   def assertStatus(status: OperationStatus, expected: OperationStatus): Boolean = {
-      val isSuccess = status == expected
-      assertTrue(isSuccess, "Status should have been '" + expected + "' but instead was: " + status)
-      isSuccess
-   }
-
-   def assertSuccess(status: OperationStatus, expected: Array[Byte], actual: Array[Byte]): Boolean = {
-      assertStatus(status, Success)
-      val isSuccess = Arrays.equals(expected, actual)
-      assertTrue(isSuccess)
-      isSuccess
-   }
-
-   def assertKeyDoesNotExist(status: OperationStatus, actual: Array[Byte]): Boolean = {
-      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
-      assertNull(actual)
-      status == KeyDoesNotExist
-   }
-
-}
-
- at Sharable
-private object ClientPipelineFactory extends ChannelPipelineFactory {
-
-   override def getPipeline() = {
-      val pipeline = Channels.pipeline
-      pipeline.addLast("decoder", Decoder)
-      pipeline.addLast("encoder", Encoder)
-      pipeline.addLast("handler", new ClientHandler)
-      pipeline
-   }
-
-}
-
- at Sharable
-private object Encoder extends OneToOneEncoder {
-
-   private val idCounter: AtomicInteger = new AtomicInteger
-
-   override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
-      val ret =
-         msg match {
-            case op: Op => {
-               val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
-               buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
-               buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
-               buffer.writeByte(10) // version
-               buffer.writeByte(op.code) // opcode
-               buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
-               buffer.writeUnsignedInt(op.flags) // flags
-               buffer.writeByte(0) // client intelligence
-               buffer.writeUnsignedInt(0) // topology id
-               if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op... 
-                  buffer.writeRangedBytes(op.key) // key length + key
-                  if (op.value != null) {
-                     buffer.writeUnsignedInt(op.lifespan) // lifespan
-                     buffer.writeUnsignedInt(op.maxIdle) // maxIdle
-                     if (op.code == 0x09 || op.code == 0x0D) {
-                        buffer.writeLong(op.version)
-                     }
-                     buffer.writeRangedBytes(op.value) // value length + value
-                  }
-               }
-//               else if (op.code != 0x15) {
-//                  buffer.writeString(op.asInstanceOf[StatsOp].statName)
-//               }
-               buffer.getUnderlyingChannelBuffer
-            }
-      }
-      ret
-   }
-
-}
-
-private object Decoder extends ReplayingDecoder[NoState] with Logging {
-
-   override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
-      val buf = new ChannelBufferAdapter(buffer)
-      val magic = buf.readUnsignedByte
-      val id = buf.readUnsignedLong
-      // val opCode = OperationResolver.resolve(buf.readUnsignedByte)
-      val opCode = OperationResponse.apply(buf.readUnsignedByte)
-      val status = OperationStatus.apply(buf.readUnsignedByte)
-      val topologyChangeMarker = buf.readUnsignedByte
-      val resp: Response =
-         opCode match {
-            case StatsResponse => {
-               val size = buf.readUnsignedInt
-               val stats = mutable.Map.empty[String, String]
-               for (i <- 1 to size) {
-                  stats += (buf.readString -> buf.readString)
-               }
-               new StatsResponse(id, immutable.Map[String, String]() ++ stats)
-            }
-            case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
-                 | RemoveResponse | RemoveIfUnmodifiedResponse | ContainsKeyResponse | ClearResponse | PingResponse =>
-               new Response(id, opCode, status)
-            case GetWithVersionResponse  => {
-               if (status == Success) {
-                  val version = buf.readLong
-                  val data = Some(buf.readRangedBytes)
-                  new GetWithVersionResponse(id, opCode, status, data, version)
-               } else{
-                  new GetWithVersionResponse(id, opCode, status, None, 0)
-               }
-            }
-            case GetResponse => {
-               if (status == Success) {
-                  val data = Some(buf.readRangedBytes)
-                  new GetResponse(id, opCode, status, data)
-               } else{
-                  new GetResponse(id, opCode, status, None)
-               }
-            }
-            case ErrorResponse => new ErrorResponse(id, status, buf.readString)
-         }
-      resp
-   }
-
-   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-      error("Error", e.getCause)
-   }
-}
-
-private class ClientHandler extends SimpleChannelUpstreamHandler {
-
-   private val answer = new LinkedBlockingQueue[Response];
-
-   override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
-      val offered = answer.offer(e.getMessage.asInstanceOf[Response])
-      assertTrue(offered)
-   }
-
-   def getResponse: Response = {
-      answer.poll(60, TimeUnit.SECONDS)
-   }
-
-}
-
-private class Op(val magic: Int,
-                 val code: Byte,
-                 val cacheName: String,
-                 val key: Array[Byte],
-                 val lifespan: Int,
-                 val maxIdle: Int,
-                 val value: Array[Byte],
-                 val flags: Int,
-                 val version: Long)
-
-private class StatsOp(override val magic: Int,
-                 override val code: Byte,
-                 override val cacheName: String,
-                 val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file

Copied: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala (from rev 1625, trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala)
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -0,0 +1,278 @@
+package org.infinispan.server.hotrod.test
+
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.bootstrap.ClientBootstrap
+import java.net.InetSocketAddress
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
+import org.jboss.netty.channel._
+import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder
+import org.testng.Assert._
+import org.infinispan.server.hotrod._
+import org.infinispan.server.hotrod.Response
+import org.infinispan.server.hotrod.OperationStatus._
+import org.infinispan.server.hotrod.OperationResponse._
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.server.core.transport.NoState
+import org.jboss.netty.channel.ChannelHandler.Sharable
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
+import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
+import org.infinispan.server.core.Logging
+import collection.mutable
+import collection.immutable
+import java.lang.reflect.Method
+import test.HotRodTestingUtil._
+
+/**
+ * A very simply Hot Rod client for testing purpouses
+ *
+ * Reasons why this should not really be a trait:
+ * Storing var instances in a trait cause issues with TestNG, see:
+ *   http://thread.gmane.org/gmane.comp.lang.scala.user/24317
+ *
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+class HotRodClient(host: String, port: Int, defaultCacheName: String) {
+
+   private lazy val ch: Channel = {
+      val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
+      val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
+      bootstrap.setPipelineFactory(ClientPipelineFactory)
+      bootstrap.setOption("tcpNoDelay", true)
+      bootstrap.setOption("keepAlive", true)
+      // Make a new connection.
+      val connectFuture = bootstrap.connect(new InetSocketAddress(host, port))
+      // Wait until the connection is made successfully.
+      val ch = connectFuture.awaitUninterruptibly.getChannel
+      assertTrue(connectFuture.isSuccess)
+      ch
+   }
+   
+   def stop = ch.disconnect
+
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+
+   def assertPut(m: Method) {
+      val status = put(k(m) , 0, 0, v(m))
+      assertStatus(status, Success)
+   }
+
+   def assertPut(m: Method, lifespan: Int, maxIdle: Int) {
+      val status = put(k(m) , lifespan, maxIdle, v(m))
+      assertStatus(status, Success)
+   }
+
+   def put(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], flags: Int): OperationStatus =
+      execute(0xA0, 0x01, defaultCacheName, k, lifespan, maxIdle, v, flags, 0)
+
+   def putIfAbsent(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
+      execute(0xA0, 0x05, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+
+   def replace(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): OperationStatus =
+      execute(0xA0, 0x07, defaultCacheName, k, lifespan, maxIdle, v, 0, 0)
+
+   def replaceIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
+      execute(0xA0, 0x09, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+
+   def removeIfUnmodified(k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte], version: Long): OperationStatus =
+      execute(0xA0, 0x0D, defaultCacheName, k, lifespan, maxIdle, v, 0, version)
+
+   def execute(magic: Int, code: Byte, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int,
+               v: Array[Byte], flags: Int, version: Long): OperationStatus = {
+      val writeFuture = ch.write(new Op(magic, code, name, k, lifespan, maxIdle, v, flags, version))
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      handler.getResponse.status
+   }
+
+   def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
+      val (getSt, actual, version) = get(0x03, k, 0)
+      (getSt, actual)
+   }
+
+   def assertGet(m: Method): (OperationStatus, Array[Byte]) = assertGet(m, 0)
+
+   def assertGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = get(k(m), flags)   
+
+   def containsKey(k: Array[Byte], flags: Int): OperationStatus = {
+      val (containsKeySt, actual, version) = get(0x0F, k, 0)
+      containsKeySt
+   }
+
+   def getWithVersion(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) =
+      get(0x11, k, 0)
+
+   def remove(k: Array[Byte], flags: Int): OperationStatus =
+      execute(0xA0, 0x0B, defaultCacheName, k, 0, 0, null, 0, 0)
+
+   def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
+      val writeFuture = ch.write(new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0))
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      // Get the handler instance to retrieve the answer.
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      if (code == 0x03) {
+         val resp = handler.getResponse.asInstanceOf[GetResponse]
+         (resp.status, if (resp.data == None) null else resp.data.get, 0)
+      } else if (code == 0x11) {
+         val resp = handler.getResponse.asInstanceOf[GetWithVersionResponse]
+         (resp.status, if (resp.data == None) null else resp.data.get, resp.version)
+      } else if (code == 0x0F) {
+         (handler.getResponse.status, null, 0)
+      } else {
+         (OperationNotExecuted, null, 0)
+      }
+   }
+
+   def clear: OperationStatus = execute(0xA0, 0x13, defaultCacheName, null, 0, 0, null, 0, 0)
+
+   def stats: Map[String, String] = {
+      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, defaultCacheName, null))
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      // Get the handler instance to retrieve the answer.
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      val resp = handler.getResponse.asInstanceOf[StatsResponse]
+      resp.stats
+   }
+
+   def ping: OperationStatus = execute(0xA0, 0x17, defaultCacheName, null, 0, 0, null, 0, 0)
+
+}
+
+ at Sharable
+private object ClientPipelineFactory extends ChannelPipelineFactory {
+
+   override def getPipeline() = {
+      val pipeline = Channels.pipeline
+      pipeline.addLast("decoder", Decoder)
+      pipeline.addLast("encoder", Encoder)
+      pipeline.addLast("handler", new ClientHandler)
+      pipeline
+   }
+
+}
+
+ at Sharable
+private object Encoder extends OneToOneEncoder {
+
+   private val idCounter: AtomicInteger = new AtomicInteger
+
+   override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
+      val ret =
+         msg match {
+            case op: Op => {
+               val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
+               buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
+               buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
+               buffer.writeByte(10) // version
+               buffer.writeByte(op.code) // opcode
+               buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
+               buffer.writeUnsignedInt(op.flags) // flags
+               buffer.writeByte(0) // client intelligence
+               buffer.writeUnsignedInt(0) // topology id
+               if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op... 
+                  buffer.writeRangedBytes(op.key) // key length + key
+                  if (op.value != null) {
+                     if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+                        buffer.writeUnsignedInt(op.lifespan) // lifespan
+                        buffer.writeUnsignedInt(op.maxIdle) // maxIdle
+                     }
+                     if (op.code == 0x09 || op.code == 0x0D) {
+                        buffer.writeLong(op.version)
+                     }
+                     if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+                        buffer.writeRangedBytes(op.value) // value length + value
+                     }
+                  }
+               }
+               buffer.getUnderlyingChannelBuffer
+            }
+      }
+      ret
+   }
+
+}
+
+private object Decoder extends ReplayingDecoder[NoState] with Logging {
+
+   override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
+      val buf = new ChannelBufferAdapter(buffer)
+      val magic = buf.readUnsignedByte
+      val id = buf.readUnsignedLong
+      // val opCode = OperationResolver.resolve(buf.readUnsignedByte)
+      val opCode = OperationResponse.apply(buf.readUnsignedByte)
+      val status = OperationStatus.apply(buf.readUnsignedByte)
+      val topologyChangeMarker = buf.readUnsignedByte
+      val resp: Response =
+         opCode match {
+            case StatsResponse => {
+               val size = buf.readUnsignedInt
+               val stats = mutable.Map.empty[String, String]
+               for (i <- 1 to size) {
+                  stats += (buf.readString -> buf.readString)
+               }
+               new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+            }
+            case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
+                 | RemoveResponse | RemoveIfUnmodifiedResponse | ContainsKeyResponse | ClearResponse | PingResponse =>
+               new Response(id, opCode, status)
+            case GetWithVersionResponse  => {
+               if (status == Success) {
+                  val version = buf.readLong
+                  val data = Some(buf.readRangedBytes)
+                  new GetWithVersionResponse(id, opCode, status, data, version)
+               } else{
+                  new GetWithVersionResponse(id, opCode, status, None, 0)
+               }
+            }
+            case GetResponse => {
+               if (status == Success) {
+                  val data = Some(buf.readRangedBytes)
+                  new GetResponse(id, opCode, status, data)
+               } else{
+                  new GetResponse(id, opCode, status, None)
+               }
+            }
+            case ErrorResponse => new ErrorResponse(id, status, buf.readString)
+         }
+      resp
+   }
+
+   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+      error("Error", e.getCause)
+   }
+}
+
+private class ClientHandler extends SimpleChannelUpstreamHandler {
+
+   private val answer = new LinkedBlockingQueue[Response];
+
+   override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+      val offered = answer.offer(e.getMessage.asInstanceOf[Response])
+      assertTrue(offered)
+   }
+
+   def getResponse: Response = {
+      answer.poll(60, TimeUnit.SECONDS)
+   }
+
+}
+
+private class Op(val magic: Int,
+                 val code: Byte,
+                 val cacheName: String,
+                 val key: Array[Byte],
+                 val lifespan: Int,
+                 val maxIdle: Int,
+                 val value: Array[Byte],
+                 val flags: Int,
+                 val version: Long)
+
+private class StatsOp(override val magic: Int,
+                 override val code: Byte,
+                 override val cacheName: String,
+                 val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file

Copied: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala (from rev 1625, trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala)
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodTestingUtil.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -0,0 +1,70 @@
+package org.infinispan.server.hotrod.test
+
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.manager.CacheManager
+import java.lang.reflect.Method
+import org.infinispan.server.hotrod.{HotRodServer}
+import org.infinispan.server.core.Logging
+import java.util.Arrays
+import org.infinispan.server.hotrod.OperationStatus._
+import org.testng.Assert._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+
+// TODO: convert to object so that mircea can use it
+object HotRodTestingUtil extends Logging {
+
+   import HotRodTestingUtil._
+
+   def host = "127.0.0.1"
+
+   def startHotRodServer(manager: CacheManager): HotRodServer =
+      startHotRodServer(manager, UniquePortThreadLocal.get.intValue)
+
+   def startHotRodServer(manager: CacheManager, port: Int): HotRodServer = {
+      val server = new HotRodServer
+      server.start(host, port, manager, 0, 0)
+      server
+   }
+
+   def k(m: Method, prefix: String): Array[Byte] = {
+      val bytes: Array[Byte] = (prefix + m.getName).getBytes
+      trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+      bytes
+   }
+
+   def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
+
+   def k(m: Method): Array[Byte] = k(m, "k-")
+
+   def v(m: Method): Array[Byte] = v(m, "v-")
+
+   def assertStatus(status: OperationStatus, expected: OperationStatus): Boolean = {
+      val isSuccess = status == expected
+      assertTrue(isSuccess, "Status should have been '" + expected + "' but instead was: " + status)
+      isSuccess
+   }
+
+   def assertSuccess(status: OperationStatus, expected: Array[Byte], actual: Array[Byte]): Boolean = {
+      assertStatus(status, Success)
+      val isSuccess = Arrays.equals(expected, actual)
+      assertTrue(isSuccess)
+      isSuccess
+   }
+
+   def assertKeyDoesNotExist(status: OperationStatus, actual: Array[Byte]): Boolean = {
+      assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
+      assertNull(actual)
+      status == KeyDoesNotExist
+   }
+   
+} 
+
+object UniquePortThreadLocal extends ThreadLocal[Int] {
+   private val uniqueAddr = new AtomicInteger(11311)
+   override def initialValue: Int = uniqueAddr.getAndAdd(100)
+}
\ No newline at end of file

Deleted: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-26 15:00:30 UTC (rev 1627)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-26 15:41:13 UTC (rev 1628)
@@ -1,50 +0,0 @@
-package org.infinispan.server.hotrod.test
-
-import java.util.concurrent.atomic.AtomicInteger
-import org.infinispan.manager.CacheManager
-import java.lang.reflect.Method
-import org.infinispan.server.hotrod.{HotRodServer}
-import org.infinispan.server.core.Logging
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since 4.1
- */
-
-// TODO: convert to object so that mircea can use it
-trait Utils {
-
-   import Utils._
-
-   def host = "127.0.0.1"
-
-   def startHotRodServer(manager: CacheManager): HotRodServer =
-      startHotRodServer(manager, UniquePortThreadLocal.get.intValue)
-
-   def startHotRodServer(manager: CacheManager, port: Int): HotRodServer = {
-      val server = new HotRodServer
-      server.start(host, port, manager, 0, 0)
-      server
-   }
-
-   def k(m: Method, prefix: String): Array[Byte] = {
-      val bytes: Array[Byte] = (prefix + m.getName).getBytes
-      trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
-      bytes
-   }
-
-   def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
-
-   def k(m: Method): Array[Byte] = k(m, "k-")
-
-   def v(m: Method): Array[Byte] = v(m, "v-")
-
-}
-
-object Utils extends Logging
-
-object UniquePortThreadLocal extends ThreadLocal[Int] {
-   private val uniqueAddr = new AtomicInteger(11311)
-   override def initialValue: Int = uniqueAddr.getAndAdd(100)
-}
\ No newline at end of file



More information about the infinispan-commits mailing list