[infinispan-commits] Infinispan SVN: r1575 - in trunk/server: hotrod/src/main/scala/org/infinispan/server/hotrod and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Mar 9 06:17:06 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-09 06:17:05 -0500 (Tue, 09 Mar 2010)
New Revision: 1575

Modified:
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelUpstreamHandler.java
   trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyEncoder.java
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Added handling of Hot Rod flags and passing them to the corresponding cache instances. Sorted out warnings as a result of Netty 3.2.0.Beta1 upgrade.  


Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelUpstreamHandler.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelUpstreamHandler.java	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyChannelUpstreamHandler.java	2010-03-09 11:17:05 UTC (rev 1575)
@@ -25,7 +25,6 @@
 import org.infinispan.CacheException;
 import org.infinispan.server.core.CommandHandler;
 import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
@@ -37,7 +36,6 @@
  * @author Galder Zamarreño
  * @since 4.0
  */
- at ChannelPipelineCoverage("one")
 public class NettyChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
    final CommandHandler handler;
    final ChannelGroup group;

Modified: trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyEncoder.java
===================================================================
--- trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyEncoder.java	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/core/src/main/java/org/infinispan/server/core/transport/netty/NettyEncoder.java	2010-03-09 11:17:05 UTC (rev 1575)
@@ -25,7 +25,7 @@
 
 import org.infinispan.server.core.transport.ChannelBuffer;
 import org.infinispan.server.core.transport.Encoder;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelHandler;
 import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
 
 /**
@@ -34,7 +34,7 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
- at ChannelPipelineCoverage("all")
+ at ChannelHandler.Sharable
 public class NettyEncoder extends OneToOneEncoder {
    final Encoder encoder;
 

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/CallerCache.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -1,8 +1,9 @@
 package org.infinispan.server.hotrod
 
-import org.infinispan.{Cache => InfinispanCache}
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
 import java.util.concurrent.TimeUnit
+import org.infinispan.context.Flag
+import org.infinispan.{AdvancedCache, Cache => InfinispanCache}
 
 /**
  * // TODO: Document this
@@ -15,7 +16,7 @@
    import CallerCache._
 
    override def put(c: StorageCommand): Response = {
-      val cache = getCache(c.cacheName)
+      val cache = getCache(c.cacheName, c.flags)
       val k = new Key(c.key)
       val v = new Value(c.value)
       (c.lifespan, c.maxIdle) match {
@@ -27,7 +28,7 @@
    }
 
    override def get(c: RetrievalCommand): Response = {
-      val cache = getCache(c.cacheName)
+      val cache = getCache(c.cacheName, c.flags)
       val value = cache.get(new Key(c.key))
       if (value != null)
          new RetrievalResponse(OpCodes.GetResponse, c.id, Status.Success, value.v)
@@ -35,13 +36,24 @@
          new RetrievalResponse(OpCodes.GetResponse, c.id, Status.KeyDoesNotExist, null)
    }
 
-   private def getCache(cacheName: String): InfinispanCache[Key, Value] = {
-      if (cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME)
-         manager.getCache[Key, Value]
-      else
-         manager.getCache(cacheName)
+   private def getCache(cacheName: String, flags: Set[Flag]): InfinispanCache[Key, Value] = {
+      val isDefaultCache = cacheName == DefaultCacheManager.DEFAULT_CACHE_NAME
+      val isWithFlags = ! flags.isEmpty
+      (isDefaultCache, isWithFlags) match {
+         case (true, true) => getAdvancedCache.withFlags(flags.toSeq : _*)
+         case (true, false) => getAdvancedCache
+         case (false, true) => getAdvancedCache(cacheName).withFlags(flags.toSeq : _*)
+         case (false, false) => getAdvancedCache(cacheName)
+      }
    }
 
+   private def getAdvancedCache(): AdvancedCache[Key, Value] =
+      manager.getCache[Key, Value].getAdvancedCache
+   
+   private def getAdvancedCache(cacheName: String): AdvancedCache[Key, Value] =
+      manager.getCache[Key, Value](cacheName).getAdvancedCache
+
+
    /**
     * Transforms lifespan pass as seconds into milliseconds
     * following this rule:
@@ -60,6 +72,6 @@
    }
 }
 
-object CallerCache {
+object CallerCache extends Logging {
    private val SecondsInAMonth = 60 * 60 * 24 * 30
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Command.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -1,5 +1,7 @@
 package org.infinispan.server.hotrod
 
+import org.infinispan.context.Flag
+   
 /**
  * // TODO: Document this
  * 
@@ -7,15 +9,9 @@
  * @since 4.1
  */
 abstract class Command(val cacheName: String,
-                       val id: Long)
-//   type AnyCommand <: Command
-//
-{
-  def perform(cache: Cache): Response
-}
-//
-//}
-////{
-//////   type AnyCommand <: Command
-//////   def perform(op: Unit => Replies)
-////}
\ No newline at end of file
+                       val id: Long,
+                       val flags: Set[Flag]) {
+
+   def perform(cache: Cache): Response
+   
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder410.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -16,7 +16,7 @@
       val op = OpCodes.apply(buffer.readUnsignedByte)
       val cacheName = buffer.readString
       val id = buffer.readUnsignedLong
-      val flags = Flags.extract(buffer.readUnsignedInt)
+      val flags = Flags.toContextFlags(buffer.readUnsignedInt)
       val command: Command =
          op match {                                   
             case PutRequest => {
@@ -30,7 +30,7 @@
             }
             case GetRequest => {
                val key = buffer.readRangedBytes
-               new RetrievalCommand(cacheName, id, key)({
+               new RetrievalCommand(cacheName, id, key, flags)({
                   (cache: Cache, command: RetrievalCommand) => cache.get(command)
                })
             }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Flags.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -15,7 +15,7 @@
    private val ZeroLockAcquisitionTimeout = Value(1, Flag.ZERO_LOCK_ACQUISITION_TIMEOUT.toString)
    private val CacheModeLocal = Value(1 << 1, Flag.CACHE_MODE_LOCAL.toString)
    private val SkipLocking = Value(1 << 2, Flag.SKIP_LOCKING.toString)
-   private val ForceWriteLock = Value(1 << 3, Flag.FORCE_WRITE_LOCK.toString)
+   private val ForceWriteLock = Value(1 << 3, Flag.FORCE_WRITE_LOCK.toString) // TODO: Does it make sense? How start txs remotely?
    private val SkipCacheStatusCheck = Value(1 << 4, Flag.SKIP_CACHE_STATUS_CHECK.toString)
    private val ForceAsynchronous = Value(1 << 5, Flag.FORCE_ASYNCHRONOUS.toString)
    private val ForceSynchronous = Value(1 << 6, Flag.FORCE_SYNCHRONOUS.toString)
@@ -25,10 +25,22 @@
    private val SkipRemoteLookup = Value(1 << 10, Flag.SKIP_REMOTE_LOOKUP.toString)
    private val PutForExternalRead = Value(1 << 11, Flag.PUT_FOR_EXTERNAL_READ.toString)
 
-   def extract(bitFlags: Int): Set[Flag] = {
+   def toContextFlags(bitFlags: Int): Set[Flag] = {
       val s = new HashSet[Flag]
       Flags.values.filter(f => (bitFlags & f.id) > 0).foreach(f => s += Flag.valueOf(f.toString))
       new immutable.HashSet ++ s
    }
 
+   /**
+    * Takes a Set of Infinispan context Flag instances and converts
+    * it into a int that can be sent with Hot Rod protocol.
+    * 
+    * This method has merely been implemented for testing purposes,
+    * to make it easier to test clients sending different flags. 
+    */
+   def fromContextFlags(flags: Set[Flag]): Int = {
+      var bitFlags = 0
+      Flags.values.filter(f => flags.contains(Flag.valueOf(f.toString))).foreach(f => bitFlags = bitFlags | f.id)
+      bitFlags
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/RetrievalCommand.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -1,5 +1,7 @@
 package org.infinispan.server.hotrod
 
+import org.infinispan.context.Flag
+
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
@@ -8,8 +10,9 @@
 
 class RetrievalCommand(override val cacheName: String,
                        override val id: Long,
-                       val key: Array[Byte])
-                      (val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id) {
+                       val key: Array[Byte],
+                       override val flags: Set[Flag])
+                      (val op: (Cache, RetrievalCommand) => Response) extends Command(cacheName, id, flags) {
 
    override def perform(cache: Cache): Response = {
       op(cache, this)

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/StorageCommand.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -13,8 +13,8 @@
                      val lifespan: Int,
                      val maxIdle: Int,
                      val value: Array[Byte],
-                     val flags: Set[Flag])
-                    (val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id) {
+                     override val flags: Set[Flag])
+                    (val op: (Cache, StorageCommand) => Response) extends Command(cacheName, id, flags) {
 
    override def perform(cache: Cache): Response = {
       op(cache, this)

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FlagsTest.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -56,16 +56,13 @@
       }
    }
 
-//   private def flag(bitFlags: Int)(size: Int)(p: Set[Flags.Value] => Boolean) {
-//      var flags = Flags.extract(bitFlags)
-//      assert { flags.size == size }
-//      assert { true == p(flags) }
-//   }
-
    private def flag(bitFlags: Int)(size: Int)(p: Set[Flag] => Boolean) {
-      val flags = Flags.extract(bitFlags)
-      assertEquals(flags.size, size)
-      assertTrue(p(flags))
+      val contextFlags = Flags.toContextFlags(bitFlags)
+      assertEquals(contextFlags.size, size)
+      assertTrue(p(contextFlags))
+
+      val fromFlags = Flags.fromContextFlags(contextFlags)
+      assertEquals(fromFlags, bitFlags)
    }
 
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/FunctionalTest.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -1,6 +1,5 @@
 package org.infinispan.server.hotrod
 
-import org.infinispan.test.SingleCacheManagerTest
 import org.infinispan.test.fwk.TestCacheManagerFactory
 import org.testng.annotations.{AfterClass, Test}
 import java.lang.reflect.Method
@@ -10,7 +9,11 @@
 import java.util.Arrays
 import org.jboss.netty.channel.Channel
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
-import org.infinispan.{Cache => InfinispanCache}
+import org.infinispan.context.Flag
+import org.infinispan.{AdvancedCache, Cache => InfinispanCache}
+import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+import javax.transaction.TransactionManager
+import javax.transaction.{Status => TransactionStatus}
 
 /**
  * TODO: Document
@@ -27,20 +30,31 @@
  */
 @Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
 class FunctionalTest extends SingleCacheManagerTest with Utils with Client {
+   private val cacheName = "hotrod-cache"
    private var server: HotRodServer = _
    private var ch: Channel = _
+   private var advancedCache: AdvancedCache[Key, Value] = _
+//   private var tm: TransactionManager = _
 
    override def createCacheManager: CacheManager = {
-      val cacheManager = TestCacheManagerFactory.createLocalCacheManager
+      val cacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
+      advancedCache = cacheManager.getCache[Key, Value](cacheName).getAdvancedCache
+//      tm = TestingUtil.getTransactionManager(advancedCache)
       server = createHotRodServer(cacheManager)
       server.start
       ch = connect("127.0.0.1", server.port)
       cacheManager
    }
 
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      super.destroyAfterClass
+      log.debug("Test finished, close Hot Rod server", null)
+      server.stop
+   }
+
    def testPutBasic(m: Method) {
-      val status = doPut(m)
-      assertSuccess(status)
+      doPut(m)
    }
 
    def testPutOnDefaultCache(m: Method) {
@@ -51,24 +65,21 @@
    }
 
    def testPutWithLifespan(m: Method) {
-      val status = doPutWithLifespanMaxIdle(m, 1, 0)
-      assertSuccess(status)
+      doPutWithLifespanMaxIdle(m, 1, 0)
       Thread.sleep(1100)
       val (getSt, actual) = doGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testPutWithMaxIdle(m: Method) {
-      val status = doPutWithLifespanMaxIdle(m, 0, 1)
-      assertSuccess(status)
+      doPutWithLifespanMaxIdle(m, 0, 1)
       Thread.sleep(1100)
       val (getSt, actual) = doGet(m)
       assertKeyDoesNotExist(getSt, actual)
    }
 
    def testGetBasic(m: Method) {
-      val putSt = doPut(m)
-      assertSuccess(putSt)
+      doPut(m)
       val (getSt, actual) = doGet(m)
       assertSuccess(getSt, v(m), actual)
    }
@@ -78,41 +89,59 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
-//   def testGetWithWriteLock(m: Method) {
-//      // TODO
+// Invalid test since starting transactions does not make sense
+// TODO: discuss flags with list
+// def testGetWithWriteLock(m: Method) {
+//      doPut(m)
+//      assertNotLocked(advancedCache, new Key(k(m)))
+//      tm.begin
+//      doGet(m, Set(Flag.FORCE_WRITE_LOCK))
+//      assertLocked(advancedCache, new Key(k(m)))
+//      tm.commit
+//      assertNotLocked(advancedCache, new Key(k(m)))
 //   }
 
-   private def assertSuccess(status: Status.Status) {
-      assertTrue(status == Success, "Status should have been 'Success' but instead was: " + status)
-   }
+//   private def transactional(op: Unit => Unit) {
+//      tm.begin
+//      try {
+//         op()
+//      } catch {
+//         case _ => tm.setRollbackOnly
+//      } finally {
+//         if (tm.getStatus == TransactionStatus.STATUS_ACTIVE) tm.commit
+//         else tm.rollback
+//      }
+//   }
 
-   private def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]) {
-      assertSuccess(status)
-      assertTrue(Arrays.equals(expected, actual))
-   }
+//   private def assertSuccess(status: Status.Status) {
+//      assertTrue(status == Success, "Status should have been 'Success' but instead was: " + status)
+//   }
+//
+//   private def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]) {
+//      assertSuccess(status)
+//      assertTrue(Arrays.equals(expected, actual))
+//   }
 
    private def assertKeyDoesNotExist(status: Status.Status, actual: Array[Byte]) {
       assertTrue(status == KeyDoesNotExist, "Status should have been 'KeyDoesNotExist' but instead was: " + status)
       assertNull(actual)
    }
 
-   private def doPut(m: Method): Status = {
+   private def doPut(m: Method) {
       doPutWithLifespanMaxIdle(m, 0, 0)
    }
 
-   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int): Status = {
-      put(ch, "hotrod-cache", k(m) , lifespan, maxIdle, v(m))
+   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
+      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
+      assertSuccess(status)
    }
 
-   private def doGet(m: Method) = {
-      get(ch, "hotrod-cache", k(m))
+   private def doGet(m: Method): (Status.Status, Array[Byte]) = {
+      doGet(m, null)
    }
 
-   @AfterClass(alwaysRun = true)
-   override def destroyAfterClass {
-      super.destroyAfterClass
-      log.debug("Test finished, close memcached server", null)
-      server.stop
+   private def doGet(m: Method, flags: Set[Flag]): (Status.Status, Array[Byte]) = {
+      get(ch, cacheName, k(m), flags)
    }
 
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -16,6 +16,8 @@
 import java.util.concurrent.atomic.AtomicInteger
 import org.infinispan.server.core.transport.NoState
 import org.jboss.netty.channel.ChannelHandler.Sharable
+import org.infinispan.context.Flag
+import java.util.Arrays
 
 /**
  * // TODO: Document this
@@ -44,8 +46,8 @@
       ch
    }
 
-   def put(ch: Channel, cacheName: String, key: Array[Byte], lifespan: Int, maxIdle: Int, value: Array[Byte]): Status = {
-      val writeFuture = ch.write(new Op(0x01, cacheName, key, lifespan, maxIdle, value))
+   def put(ch: Channel, name: String, k: Array[Byte], lifespan: Int, maxIdle: Int, v: Array[Byte]): Status = {
+      val writeFuture = ch.write(new Op(0x01, name, k, lifespan, maxIdle, v, null))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -53,8 +55,12 @@
       handler.getResponse.status
    }
 
-   def get(ch: Channel, cacheName: String, key: Array[Byte]) = {
-      val writeFuture = ch.write(new Op(0x03, cacheName, key, 0, 0, null))
+//   def get(ch: Channel, name: String, key: Array[Byte]): (Status.Status, Array[Byte]) = {
+//      get(ch, name, key, null)
+//   }
+
+   def get(ch: Channel, name: String, k: Array[Byte], flags: Set[Flag]): (Status.Status, Array[Byte]) = {
+      val writeFuture = ch.write(new Op(0x03, name, k, 0, 0, null, flags))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -63,6 +69,19 @@
       (resp.status, resp.value)
    }
 
+   def assertSuccess(status: Status.Status): Boolean = {
+      val isSuccess = status == Success
+      assertTrue(isSuccess, "Status should have been 'Success' but instead was: " + status)
+      isSuccess
+   }
+
+   def assertSuccess(status: Status.Status, expected: Array[Byte], actual: Array[Byte]): Boolean = {
+      assertSuccess(status)
+      val isSuccess = Arrays.equals(expected, actual)
+      assertTrue(isSuccess)
+      isSuccess
+   }
+
 }
 
 @Sharable
@@ -93,7 +112,11 @@
                buffer.writeByte(op.code) // opcode
                buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
                buffer.writeUnsignedLong(idCounter.incrementAndGet) // message id
-               buffer.writeUnsignedInt(0) // flags
+               if (op.flags != null)
+                  buffer.writeUnsignedInt(Flags.fromContextFlags(op.flags)) // flags
+               else
+                  buffer.writeUnsignedInt(0) // flags
+
                buffer.writeRangedBytes(op.key) // key length + key
                if (op.value != null) {
                   buffer.writeUnsignedInt(op.lifespan) // lifespan
@@ -137,7 +160,6 @@
    }
 }
 
- at ChannelPipelineCoverage("one")
 private class ClientHandler extends SimpleChannelUpstreamHandler {
 
    private val answer = new LinkedBlockingQueue[Response]; 
@@ -158,4 +180,5 @@
                  val key: Array[Byte],
                  val lifespan: Int,
                  val maxIdle: Int,
-                 val value: Array[Byte])
\ No newline at end of file
+                 val value: Array[Byte],
+                 val flags: Set[Flag])
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-09 10:47:28 UTC (rev 1574)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Utils.scala	2010-03-09 11:17:05 UTC (rev 1575)
@@ -21,6 +21,10 @@
       new HotRodServer(host, UniquePortThreadLocal.get.intValue, manager, 0, 0)
    }
 
+   def createHotRodServer(manager: CacheManager, port: Int): HotRodServer = {
+      new HotRodServer(host, port, manager, 0, 0)
+   }
+
    def k(m: Method, prefix: String): Array[Byte] = {
       val bytes: Array[Byte] = (prefix + m.getName).getBytes
       trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)



More information about the infinispan-commits mailing list