[infinispan-commits] Infinispan SVN: r1623 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport and 4 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Mar 25 16:46:13 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-25 16:46:12 -0400 (Thu, 25 Mar 2010)
New Revision: 1623

Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) NoReply has no meaning for Hot Rod, so moved to memcached module and tidied up buffer creation code.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -4,7 +4,6 @@
 import Operation._
 import scala.collection.mutable.HashMap
 import scala.collection.immutable
-import org.infinispan.remoting.transport.Address
 import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.JavaConversions._
 import java.util.concurrent.TimeUnit
@@ -12,6 +11,7 @@
 import org.infinispan.server.core.VersionGenerator._
 import java.io.StreamCorruptedException
 import transport._
+import transport.ChannelBuffers._
 
 /**
  * // TODO: Document this
@@ -38,18 +38,18 @@
                val k = readKey(header, buffer)
                val params = readParameters(header, buffer)
                header.op match {
-                  case PutRequest => put(header, k, params.get, cache)
-                  case PutIfAbsentRequest => putIfAbsent(header, k, params.get, cache)
-                  case ReplaceRequest => replace(header, k, params.get, cache)
-                  case ReplaceIfUnmodifiedRequest => replaceIfUmodified(header, k, params.get, cache)
+                  case PutRequest => put(header, k, params, cache)
+                  case PutIfAbsentRequest => putIfAbsent(header, k, params, cache)
+                  case ReplaceRequest => replace(header, k, params, cache)
+                  case ReplaceIfUnmodifiedRequest => replaceIfUmodified(header, k, params, cache)
                   case RemoveRequest => remove(header, k, params, cache)
                }
             }
-            case GetRequest | GetWithVersionRequest => get(header, buffer, ctx.getChannelBuffers, cache)
-            case StatsRequest => createStatsResponse(header, ctx.getChannelBuffers, cache.getAdvancedCache.getStats)
-            case _ => handleCustomRequest(header, ctx, buffer, cache)
+            case GetRequest | GetWithVersionRequest => get(header, buffer, cache)
+            case StatsRequest => createStatsResponse(header, cache.getAdvancedCache.getStats)
+            case _ => handleCustomRequest(header, buffer, cache)
          }
-         writeResponse(ctx.getChannel, ctx.getChannelBuffers, ret)
+         writeResponse(ctx.getChannel, ret)
          null
       } catch {
          case se: ServerException => throw se
@@ -58,88 +58,79 @@
       }
    }
 
-   private def writeResponse(ch: Channel, buffers: ChannelBuffers, response: AnyRef) {
+   private def writeResponse(ch: Channel, response: AnyRef) {
       if (response != null) {
          response match {
+            // We only expect Lists of ChannelBuffer instances, so don't worry about type erasure 
             case l: List[ChannelBuffer] => l.foreach(ch.write(_))
-            case a: Array[Byte] => ch.write(buffers.wrappedBuffer(a))
-            case sb: StringBuilder => ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
-            case s: String => ch.write(buffers.wrappedBuffer(s.getBytes))
+            case a: Array[Byte] => ch.write(wrappedBuffer(a))
+            case sb: StringBuilder => ch.write(wrappedBuffer(sb.toString.getBytes))
+            case s: String => ch.write(wrappedBuffer(s.getBytes))
             case _ => ch.write(response)
          }
       }
    }
 
-   private def put(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
-      val v = createValue(header, params, generateVersion(cache))
-      cache.put(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
-      if (!params.noReply) createSuccessResponse(header) else null
+   private def put(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+      val p = params.get
+      val v = createValue(header, p, generateVersion(cache))
+      cache.put(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
+      createSuccessResponse(header, params)
    }
 
-   private def putIfAbsent(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+   private def putIfAbsent(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+      val p = params.get
       val prev = cache.get(k)
       if (prev == null) { // Generate new version only if key not present      
-         val v = createValue(header, params, generateVersion(cache))
-         cache.putIfAbsent(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+         val v = createValue(header, p, generateVersion(cache))
+         cache.putIfAbsent(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
-      if (!params.noReply && prev == null)
-         createSuccessResponse(header)
-      else if (!params.noReply && prev != null)
-         createNotExecutedResponse(header)
+      if (prev == null)
+         createSuccessResponse(header, params)
       else
-         null
+         createNotExecutedResponse(header, params)
    }
 
-   private def replace(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+   private def replace(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+      val p = params.get
       val prev = cache.get(k)
       if (prev != null) { // Generate new version only if key present
-         val v = createValue(header, params, generateVersion(cache))
-         cache.replace(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+         val v = createValue(header, p, generateVersion(cache))
+         cache.replace(k, v, toMillis(p.lifespan), DefaultTimeUnit, toMillis(p.maxIdle), DefaultTimeUnit)
       }
-      if (!params.noReply && prev != null)
-         createSuccessResponse(header)
-      else if (!params.noReply && prev == null)
-         createNotExecutedResponse(header)
+      if (prev != null)
+         createSuccessResponse(header, params)
       else
-         null
+         createNotExecutedResponse(header, params)
    }
 
-   private def replaceIfUmodified(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+   private def replaceIfUmodified(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+      val p = params.get
       val prev = cache.get(k)
       if (prev != null) {
-         if (prev.version == params.streamVersion) {
+         if (prev.version == p.streamVersion) {
             // Generate new version only if key present and version has not changed, otherwise it's wasteful
-            val v = createValue(header, params, generateVersion(cache))
+            val v = createValue(header, p, generateVersion(cache))
             val replaced = cache.replace(k, prev, v);
-            if (!params.noReply && replaced)
-               createSuccessResponse(header)
-            else if (!params.noReply)
-               createNotExecutedResponse(header)
+            if (replaced)
+               createSuccessResponse(header, params)
             else
-               null
-         } else if (!params.noReply) {
-            createNotExecutedResponse(header)
+               createNotExecutedResponse(header, params)
          } else {
-            null
-         }
-      } else if(!params.noReply) {
-         createNotExistResponse(header)
-      } else {
-         null
-      }
+            createNotExecutedResponse(header, params)
+         }            
+      } else createNotExistResponse(header, params)
    }
 
    private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
       val prev = cache.remove(k)
-      if ((params == None || !params.get.noReply) && prev != null)
-         createSuccessResponse(header)
-      else if ((params == None || !params.get.noReply) && prev == null)
-         createNotExistResponse(header)
+      if (prev != null)
+         createSuccessResponse(header, params)
       else
-         null
+         createNotExistResponse(header, params)
    }
 
-   private def get(header: SuitableHeader, buffer: ChannelBuffer, buffers: ChannelBuffers, cache: Cache[K, V]): AnyRef = {
+   private def get(header: SuitableHeader, buffer: ChannelBuffer, cache: Cache[K, V]): AnyRef = {
       val keys = readKeys(header, buffer)
       if (keys.length > 1) {
          val map = new HashMap[K,V]()
@@ -148,54 +139,52 @@
             if (v != null)
                map += (k -> v)
          }
-         createMultiGetResponse(header, buffers, new immutable.HashMap ++ map)
+         createMultiGetResponse(header, new immutable.HashMap ++ map)
       } else {
-         createGetResponse(header, buffers, keys.head, cache.get(keys.head))
+         createGetResponse(header, keys.head, cache.get(keys.head))
       }
    }
 
    override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
       error("Exception reported", e.getCause)
       val ch = ctx.getChannel
-      val buffers = ctx.getChannelBuffers
       val errorResponse = createErrorResponse(e.getCause)
       if (errorResponse != null) {
          errorResponse match {
-            case a: Array[Byte] => ch.write(buffers.wrappedBuffer(a))
-            case sb: StringBuilder => ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+            case a: Array[Byte] => ch.write(wrappedBuffer(a))
+            case sb: StringBuilder => ch.write(wrappedBuffer(sb.toString.getBytes))
             case _ => ch.write(errorResponse)
          }
       }
    }
 
-   protected def readHeader(buffer: ChannelBuffer): SuitableHeader
+   protected def readHeader(b: ChannelBuffer): SuitableHeader
 
-   protected def getCache(header: SuitableHeader): Cache[K, V]
+   protected def getCache(h: SuitableHeader): Cache[K, V]
 
-   protected def readKey(header: SuitableHeader, buffer: ChannelBuffer): K
+   protected def readKey(h: SuitableHeader, b: ChannelBuffer): K
 
-   protected def readKeys(header: SuitableHeader, buffer: ChannelBuffer): Array[K]
+   protected def readKeys(h: SuitableHeader, b: ChannelBuffer): Array[K]
 
-   protected def readParameters(header: SuitableHeader, buffer: ChannelBuffer): Option[SuitableParameters]
+   protected def readParameters(h: SuitableHeader, b: ChannelBuffer): Option[SuitableParameters]
 
-   protected def createValue(header: SuitableHeader, params: SuitableParameters, nextVersion: Long): V
+   protected def createValue(h: SuitableHeader, p: SuitableParameters, nextVersion: Long): V
 
-   protected def createSuccessResponse(header: SuitableHeader): AnyRef
+   protected def createSuccessResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
 
-   protected def createNotExecutedResponse(header: SuitableHeader): AnyRef
+   protected def createNotExecutedResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
 
-   protected def createNotExistResponse(header: SuitableHeader): AnyRef
+   protected def createNotExistResponse(h: SuitableHeader, params: Option[SuitableParameters]): AnyRef
 
-   protected def createGetResponse(header: SuitableHeader, buffers: ChannelBuffers, k: K, v: V): AnyRef
+   protected def createGetResponse(h: SuitableHeader, k: K, v: V): AnyRef
 
-   protected def createMultiGetResponse(header: SuitableHeader, buffers: ChannelBuffers, pairs: Map[K, V]): AnyRef
+   protected def createMultiGetResponse(h: SuitableHeader, pairs: Map[K, V]): AnyRef
    
    protected def createErrorResponse(t: Throwable): AnyRef
 
-   protected def createStatsResponse(header: SuitableHeader, buffers: ChannelBuffers, stats: Stats): AnyRef
+   protected def createStatsResponse(h: SuitableHeader, stats: Stats): AnyRef
 
-   protected def handleCustomRequest(header: SuitableHeader, ctx: ChannelHandlerContext,
-                           buffer: ChannelBuffer, cache: Cache[K, V]): AnyRef
+   protected def handleCustomRequest(h: SuitableHeader, b: ChannelBuffer, cache: Cache[K, V]): AnyRef
 
    protected def generateVersion(cache: Cache[K, V]): Long = {
       val rpcManager = cache.getAdvancedCache.getRpcManager
@@ -234,7 +223,7 @@
 class RequestHeader(val op: Enumeration#Value)
 
 // TODO: NoReply could possibly be passed to subclass specific to memcached and make create* implementations use it
-class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val streamVersion: Long, val noReply: Boolean)
+class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val streamVersion: Long)
 
 class UnknownOperationException(reason: String) extends StreamCorruptedException(reason)
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -1,14 +1,21 @@
 package org.infinispan.server.core.transport
 
+import netty.ChannelBuffersAdapter
+
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
 
-abstract class ChannelBuffers {
-//   def wrappedBuffer(buffers: ChannelBuffer*): ChannelBuffer
-//   def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer
-   def wrappedBuffer(array: Array[Byte]*): ChannelBuffer
-   def dynamicBuffer(): ChannelBuffer
+object ChannelBuffers {
+   
+   def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
+      ChannelBuffersAdapter.wrappedBuffer(array : _*)
+   }
+
+   def dynamicBuffer(): ChannelBuffer = {
+      ChannelBuffersAdapter.dynamicBuffer
+   }
+
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -8,5 +8,4 @@
 
 abstract class ChannelHandlerContext {
    def getChannel: Channel
-   def getChannelBuffers: ChannelBuffers
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -1,8 +1,7 @@
 package org.infinispan.server.core.transport.netty
 
-import org.infinispan.server.core.transport.{ChannelBuffer, ChannelBuffers}
+import org.infinispan.server.core.transport.{ChannelBuffer}
 import org.jboss.netty.buffer.{ChannelBuffers => NettyChannelBuffers}
-import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
 
 /**
  * // TODO: Document this
@@ -10,13 +9,13 @@
  * @since
  */
 
-object ChannelBuffersAdapter extends ChannelBuffers {
+object ChannelBuffersAdapter {
 
-   override def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
+   def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
       new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array : _*));
    }
    
-   override def dynamicBuffer(): ChannelBuffer = {
+   def dynamicBuffer(): ChannelBuffer = {
       new ChannelBufferAdapter(NettyChannelBuffers.dynamicBuffer());
    }
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -13,7 +13,4 @@
    
    override def getChannel: Channel = new ChannelAdapter(ctx.getChannel)
 
-   // TODO: Remove this from here and make it available via an object, this would clean up unnecessary params 
-   override def getChannelBuffers: ChannelBuffers = ChannelBuffersAdapter
-   
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -55,7 +55,7 @@
             case _ => -1
          }
          val data = buffer.readRangedBytes
-         Some(new RequestParameters(data, lifespan, maxIdle, version, false))
+         Some(new RequestParameters(data, lifespan, maxIdle, version))
       } else {
          None
       }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -66,36 +66,38 @@
       else cacheManager.getCache(header.cacheName)
    }
 
-   override def readKey(header: HotRodHeader, buffer: ChannelBuffer): CacheKey = header.decoder.readKey(buffer)
+   override def readKey(h: HotRodHeader, b: ChannelBuffer): CacheKey =
+      h.decoder.readKey(b)
 
-   override def readKeys(header: HotRodHeader, buffer: ChannelBuffer): Array[CacheKey] =
-      header.decoder.readKeys(buffer)
+   override def readKeys(h: HotRodHeader, b: ChannelBuffer): Array[CacheKey] =
+      h.decoder.readKeys(b)
 
-   override def readParameters(header: HotRodHeader, buffer: ChannelBuffer): Option[RequestParameters] =
-      header.decoder.readParameters(header, buffer)
+   override def readParameters(h: HotRodHeader, b: ChannelBuffer): Option[RequestParameters] =
+      h.decoder.readParameters(h, b)
 
-   override def createValue(header: HotRodHeader, params: RequestParameters, nextVersion: Long): CacheValue =
-      header.decoder.createValue(params, nextVersion)
+   override def createValue(h: HotRodHeader, p: RequestParameters, nextVersion: Long): CacheValue =
+      h.decoder.createValue(p, nextVersion)
 
-   override def createSuccessResponse(header: HotRodHeader): AnyRef = header.decoder.createSuccessResponse(header)
+   override def createSuccessResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
+      h.decoder.createSuccessResponse(h)
 
-   override def createNotExecutedResponse(header: HotRodHeader): AnyRef = header.decoder.createNotExecutedResponse(header)
+   override def createNotExecutedResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
+      h.decoder.createNotExecutedResponse(h)
 
-   override def createNotExistResponse(header: HotRodHeader): AnyRef = header.decoder.createNotExistResponse(header)
+   override def createNotExistResponse(h: HotRodHeader, p: Option[RequestParameters]): AnyRef =
+      h.decoder.createNotExistResponse(h)
 
-   override def createGetResponse(header: HotRodHeader, buffers: ChannelBuffers,
-                                k: CacheKey, v: CacheValue): AnyRef =
-      header.decoder.createGetResponse(header.messageId, v, header.op)
+   override def createGetResponse(h: HotRodHeader, k: CacheKey, v: CacheValue): AnyRef =
+      h.decoder.createGetResponse(h.messageId, v, h.op)
 
-   override def createMultiGetResponse(header: HotRodHeader, buffers: ChannelBuffers,
-                                       pairs: Map[CacheKey, CacheValue]): AnyRef = null // Unsupported
+   override def createMultiGetResponse(h: HotRodHeader, pairs: Map[CacheKey, CacheValue]): AnyRef =
+      null // Unsupported
 
-   override def handleCustomRequest(header: HotRodHeader, ctx: ChannelHandlerContext,
-                                    buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef =
-      header.decoder.handleCustomRequest(header, buffer, cache)
+   override def handleCustomRequest(h: HotRodHeader, b: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef =
+      h.decoder.handleCustomRequest(h, b, cache)
 
-   override def createStatsResponse(header: HotRodHeader, buffers: ChannelBuffers, stats: Stats): AnyRef =
-      header.decoder.createStatsResponse(header, stats)
+   override def createStatsResponse(h: HotRodHeader, stats: Stats): AnyRef =
+      h.decoder.createStatsResponse(h, stats)
 
    override def createErrorResponse(t: Throwable): AnyRef = {
       t match {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -3,6 +3,7 @@
 import org.infinispan.server.core.Logging
 import org.infinispan.server.core.transport.{ChannelBuffer, ChannelHandlerContext, Channel, Encoder}
 import OperationStatus._
+import org.infinispan.server.core.transport.ChannelBuffers._
 
 /**
  * // TODO: Document this
@@ -17,7 +18,7 @@
       trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer = msg match {
          case s: StatsResponse => {
-            val buffer = ctx.getChannelBuffers.dynamicBuffer
+            val buffer = dynamicBuffer
             writeHeader(buffer, s)
             buffer.writeUnsignedInt(s.stats.size)
             for ((key, value) <- s.stats) {
@@ -26,7 +27,7 @@
             }
             buffer
          }
-         case r: Response => writeHeader(ctx.getChannelBuffers.dynamicBuffer, r)
+         case r: Response => writeHeader(dynamicBuffer, r)
       }
       msg match {
          case g: GetWithVersionResponse => {

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -130,7 +130,7 @@
    }
 
    def stats(ch: Channel, name: String): Map[String, String] = {
-      val writeFuture = ch.write(new Op(0xA0, 0x15, name, null, 0, 0, null, 0, 0))
+      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, name, null))
       writeFuture.awaitUninterruptibly
       assertTrue(writeFuture.isSuccess)
       // Get the handler instance to retrieve the answer.
@@ -139,6 +139,16 @@
       resp.stats
    }
 
+//   def stats(ch: Channel, name: String, statName: String): Map[String, String] = {
+//      val writeFuture = ch.write(new StatsOp(0xA0, 0x15, name, statName))
+//      writeFuture.awaitUninterruptibly
+//      assertTrue(writeFuture.isSuccess)
+//      // Get the handler instance to retrieve the answer.
+//      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+//      val resp = handler.getResponse.asInstanceOf[StatsResponse]
+//      resp.stats
+//   }
+
    def ping(ch: Channel, name: String): OperationStatus = {
       put(ch, 0xA0, 0x17, name, null, 0, 0, null, 0, 0)
    }
@@ -206,6 +216,9 @@
                      buffer.writeRangedBytes(op.value) // value length + value
                   }
                }
+//               else if (op.code != 0x15) {
+//                  buffer.writeString(op.asInstanceOf[StatsOp].statName)
+//               }
                buffer.getUnderlyingChannelBuffer
             }
       }
@@ -287,4 +300,9 @@
                  val maxIdle: Int,
                  val value: Array[Byte],
                  val flags: Int,
-                 val version: Long)
\ No newline at end of file
+                 val version: Long)
+
+private class StatsOp(override val magic: Int,
+                 override val code: Byte,
+                 override val cacheName: String,
+                 val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-25 20:46:12 UTC (rev 1623)
@@ -13,6 +13,7 @@
 import org.infinispan.server.core._
 import org.infinispan.{AdvancedCache, Version, CacheException, Cache}
 import collection.mutable.ListBuffer
+import org.infinispan.server.core.transport.ChannelBuffers._
 
 /**
  * // TODO: Document this
@@ -47,22 +48,20 @@
       new RequestHeader(op.get)
    }
 
-   override def readKey(header: RequestHeader, buffer: ChannelBuffer): String = {
-      readElement(buffer)
-   }
+   override def readKey(h: RequestHeader, b: ChannelBuffer): String = readElement(b)
 
-   override def readKeys(header: RequestHeader, buffer: ChannelBuffer): Array[String] = {
-      val line = readLine(buffer)
+   override def readKeys(h: RequestHeader, b: ChannelBuffer): Array[String] = {
+      val line = readLine(b)
       line.trim.split(" +")
    }
 
-   override def readParameters(header: RequestHeader, buffer: ChannelBuffer): Option[MemcachedParameters] = {
-      val line = readLine(buffer)
+   override def readParameters(h: RequestHeader, b: ChannelBuffer): Option[MemcachedParameters] = {
+      val line = readLine(b)
       if (!line.isEmpty) {
          trace("Operation parameters: {0}", line)
          val args = line.trim.split(" +")
          var index = 0
-         header.op match {
+         h.op match {
             case RemoveRequest => {
                val delayedDeleteTime = parseDelayedDeleteTime(index, args)
                val noReply = if (delayedDeleteTime == -1) parseNoReply(index, args) else false
@@ -87,7 +86,7 @@
                }
                index += 1
                val length = getLength(args(index))
-               val streamVersion = header.op match {
+               val streamVersion = h.op match {
                   case ReplaceIfUnmodifiedRequest => {
                      index += 1
                      getVersion(args(index))
@@ -97,8 +96,8 @@
                index += 1
                val noReply = parseNoReply(index, args)
                val data = new Array[Byte](length)
-               buffer.readBytes(data, 0, data.length)
-               readLine(buffer) // read the rest of line to clear CRLF after value Byte[]
+               b.readBytes(data, 0, data.length)
+               readLine(b) // read the rest of line to clear CRLF after value Byte[]
                Some(new MemcachedParameters(data, lifespan, -1, streamVersion, noReply, flags, "", 0))
             }
          }
@@ -107,8 +106,8 @@
       }
    }
 
-   override def createValue(header: SuitableHeader, params: MemcachedParameters, nextVersion: Long): MemcachedValue = {
-      new MemcachedValue(params.data, nextVersion, params.flags)
+   override def createValue(h: SuitableHeader, p: MemcachedParameters, nextVersion: Long): MemcachedValue = {
+      new MemcachedValue(p.data, nextVersion, p.flags)
    }
 
    private def getFlags(flags: String): Int = {
@@ -153,21 +152,18 @@
       else 0
    }
 
-   override def getCache(header: RequestHeader): Cache[String, MemcachedValue] = cache
+   override def getCache(h: RequestHeader): Cache[String, MemcachedValue] = cache
 
    protected def createCache: Cache[String, MemcachedValue] = cacheManager.getCache[String, MemcachedValue]
 
-   override def handleCustomRequest(header: RequestHeader, ctx: ChannelHandlerContext,
-                                    buffer: ChannelBuffer, cache: Cache[String, MemcachedValue]): AnyRef = {
-      val ch = ctx.getChannel
-      val buffers = ctx.getChannelBuffers
-      header.op match {
+   override def handleCustomRequest(h: RequestHeader, b: ChannelBuffer, cache: Cache[String, MemcachedValue]): AnyRef = {
+      h.op match {
          case AppendRequest | PrependRequest => {
-            val k = readKey(header, buffer)
-            val params = readParameters(header, buffer)
+            val k = readKey(h, b)
+            val params = readParameters(h, b)
             val prev = cache.get(k)
             if (prev != null) {
-               val concatenated = header.op match {
+               val concatenated = h.op match {
                   case AppendRequest => concat(prev.data, params.get.data);
                   case PrependRequest => concat(params.get.data, prev.data);
                }
@@ -182,13 +178,13 @@
             }
          }
          case IncrementRequest | DecrementRequest => {
-            val k = readKey(header, buffer)
-            val params = readParameters(header, buffer)
+            val k = readKey(h, b)
+            val params = readParameters(h, b)
             val prev = cache.get(k)
             if (prev != null) {
                val prevCounter = new String(prev.data)
                val newCounter =
-                  header.op match {
+                  h.op match {
                      case IncrementRequest => prevCounter.toLong + params.get.delta.toLong
                      case DecrementRequest => {
                         val candidateCounter = prevCounter.toLong - params.get.delta.toLong
@@ -198,19 +194,19 @@
                val next = createValue(newCounter.toString.getBytes, generateVersion(cache), params.get.flags)
                var replaced = cache.replace(k, prev, next)
                if (replaced) {
-                  if (isStatsEnabled) if (header.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
+                  if (isStatsEnabled) if (h.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
                   if (!params.get.noReply) new String(next.data) + CRLF else null
                } else {
                   // If there's a concurrent modification on this key, the spec does not say what to do, so treat it as exceptional
                   throw new CacheException("Value modified since we retrieved from the cache, old value was " + prevCounter)
                }
             } else {
-               if (isStatsEnabled) if (header.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
+               if (isStatsEnabled) if (h.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
                if (!params.get.noReply) NOT_FOUND else null
             }
          }
          case FlushAllRequest => {
-            val params = readParameters(header, buffer)
+            val params = readParameters(h, b)
             val flushFunction = (cache: AdvancedCache[String, MemcachedValue]) => cache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STORE).clear
             val flushDelay = if (params == None) 0 else params.get.flushDelay
             if (flushDelay == 0)
@@ -223,60 +219,63 @@
       }
    }
 
-//   override def createPutResponse(header: RequestHeader): AnyRef = STORED
-
-   override def createSuccessResponse(header: RequestHeader): AnyRef = {
+   override def createSuccessResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
       if (isStatsEnabled) {
-         header.op match {
+         h.op match {
             case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedHits.incrementAndGet
             case _ => // No-op
          }
       }
-      header.op match {
-         case RemoveRequest => DELETED
-         case _ => STORED
-      }
+      if (params == None || !params.get.noReply) {
+         h.op match {
+            case RemoveRequest => DELETED
+            case _ => STORED
+         }
+      } else null
    }
 
-   override def createNotExecutedResponse(header: RequestHeader): AnyRef = {
+   override def createNotExecutedResponse(h: RequestHeader, params: Option[MemcachedParameters]): AnyRef = {
       if (isStatsEnabled) {
-         header.op match {
+         h.op match {
             case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedBadval.incrementAndGet
             case _ => // No-op
          }
       }
-      header.op match {
-         case ReplaceIfUnmodifiedRequest => EXISTS
-         case _ => NOT_STORED
-      }
+      if (params == None || !params.get.noReply) {
+         h.op match {
+            case ReplaceIfUnmodifiedRequest => EXISTS
+            case _ => NOT_STORED
+         }
+      } else null
    }
 
-   override def createNotExistResponse(header: SuitableHeader): AnyRef = {
+   override def createNotExistResponse(h: SuitableHeader, params: Option[MemcachedParameters]): AnyRef = {
       if (isStatsEnabled) {
-         header.op match {
+         h.op match {
             case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedMisses.incrementAndGet
             case _ => // No-op
          }
-      }      
-      NOT_FOUND
+      }
+      if (params == None || !params.get.noReply)
+         NOT_FOUND
+      else
+         null
    }
 
-   override def createGetResponse(header: RequestHeader, buffers: ChannelBuffers,
-                                  k: String, v: MemcachedValue): AnyRef = {
+   override def createGetResponse(h: RequestHeader, k: String, v: MemcachedValue): AnyRef = {
       if (v != null)
-         List(buildGetResponse(header.op, buffers, k, v), buffers.wrappedBuffer(END))
+         List(buildGetResponse(h.op, k, v), wrappedBuffer(END))
       else
          END
    }
 
-   override def createMultiGetResponse(header: RequestHeader, buffers: ChannelBuffers,
-                                       pairs: Map[String, MemcachedValue]): AnyRef = {
+   override def createMultiGetResponse(h: RequestHeader, pairs: Map[String, MemcachedValue]): AnyRef = {
       val elements = new ListBuffer[ChannelBuffer]
-      header.op match {
+      h.op match {
          case GetRequest | GetWithVersionRequest => {
             for ((k, v) <- pairs)
-               elements += buildGetResponse(header.op, buffers, k, v)
-            elements += buffers.wrappedBuffer("END\r\n".getBytes)
+               elements += buildGetResponse(h.op, k, v)
+            elements += wrappedBuffer("END\r\n".getBytes)
          }
       }
       elements.toList
@@ -302,52 +301,52 @@
       }
    }
 
-   def createStatsResponse(header: RequestHeader, buffers: ChannelBuffers, stats: Stats): AnyRef = {
+   def createStatsResponse(header: RequestHeader, stats: Stats): AnyRef = {
       var sb = new StringBuilder
       List[ChannelBuffer] (
-         buildStat("pid", 0, sb, buffers),
-         buildStat("uptime", stats.getTimeSinceStart, sb, buffers),
-         buildStat("uptime", stats.getTimeSinceStart, sb, buffers),
-         buildStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb, buffers),
-         buildStat("version", cache.getVersion, sb, buffers),
-         buildStat("pointer_size", 0, sb, buffers), // Unsupported
-         buildStat("rusage_user", 0, sb, buffers), // Unsupported
-         buildStat("rusage_system", 0, sb, buffers), // Unsupported
-         buildStat("curr_items", stats.getCurrentNumberOfEntries, sb, buffers),
-         buildStat("total_items", stats.getTotalNumberOfEntries, sb, buffers),
-         buildStat("bytes", 0, sb, buffers), // Unsupported
-         buildStat("curr_connections", 0, sb, buffers), // TODO: Through netty?
-         buildStat("total_connections", 0, sb, buffers), // TODO: Through netty?
-         buildStat("connection_structures", 0, sb, buffers), // Unsupported
-         buildStat("cmd_get", stats.getRetrievals, sb, buffers),
-         buildStat("cmd_set", stats.getStores, sb, buffers),
-         buildStat("get_hits", stats.getHits, sb, buffers),
-         buildStat("get_misses", stats.getMisses, sb, buffers),
-         buildStat("delete_misses", stats.getRemoveMisses, sb, buffers),
-         buildStat("delete_hits", stats.getRemoveHits, sb, buffers),
-         buildStat("incr_misses", incrMisses, sb, buffers),
-         buildStat("incr_hits", incrHits, sb, buffers),
-         buildStat("decr_misses", decrMisses, sb, buffers),
-         buildStat("decr_hits", decrHits, sb, buffers),
-         buildStat("cas_misses", replaceIfUnmodifiedMisses, sb, buffers),
-         buildStat("cas_hits", replaceIfUnmodifiedHits, sb, buffers),
-         buildStat("cas_badval", replaceIfUnmodifiedBadval, sb, buffers),
-         buildStat("auth_cmds", 0, sb, buffers), // Unsupported
-         buildStat("auth_errors", 0, sb, buffers), // Unsupported
+         buildStat("pid", 0, sb),
+         buildStat("uptime", stats.getTimeSinceStart, sb),
+         buildStat("uptime", stats.getTimeSinceStart, sb),
+         buildStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb),
+         buildStat("version", cache.getVersion, sb),
+         buildStat("pointer_size", 0, sb), // Unsupported
+         buildStat("rusage_user", 0, sb), // Unsupported
+         buildStat("rusage_system", 0, sb), // Unsupported
+         buildStat("curr_items", stats.getCurrentNumberOfEntries, sb),
+         buildStat("total_items", stats.getTotalNumberOfEntries, sb),
+         buildStat("bytes", 0, sb), // Unsupported
+         buildStat("curr_connections", 0, sb), // TODO: Through netty?
+         buildStat("total_connections", 0, sb), // TODO: Through netty?
+         buildStat("connection_structures", 0, sb), // Unsupported
+         buildStat("cmd_get", stats.getRetrievals, sb),
+         buildStat("cmd_set", stats.getStores, sb),
+         buildStat("get_hits", stats.getHits, sb),
+         buildStat("get_misses", stats.getMisses, sb),
+         buildStat("delete_misses", stats.getRemoveMisses, sb),
+         buildStat("delete_hits", stats.getRemoveHits, sb),
+         buildStat("incr_misses", incrMisses, sb),
+         buildStat("incr_hits", incrHits, sb),
+         buildStat("decr_misses", decrMisses, sb),
+         buildStat("decr_hits", decrHits, sb),
+         buildStat("cas_misses", replaceIfUnmodifiedMisses, sb),
+         buildStat("cas_hits", replaceIfUnmodifiedHits, sb),
+         buildStat("cas_badval", replaceIfUnmodifiedBadval, sb),
+         buildStat("auth_cmds", 0, sb), // Unsupported
+         buildStat("auth_errors", 0, sb), // Unsupported
          //TODO: Evictions are measure by evict calls, but not by nodes are that are expired after the entry's lifespan has expired.
-         buildStat("evictions", stats.getEvictions, sb, buffers),
-         buildStat("bytes_read", 0, sb, buffers), // TODO: Through netty?
-         buildStat("bytes_written", 0, sb, buffers), // TODO: Through netty?
-         buildStat("limit_maxbytes", 0, sb, buffers), // Unsupported
-         buildStat("threads", 0, sb, buffers), // TODO: Through netty?
-         buildStat("conn_yields", 0, sb, buffers), // Unsupported
-         buffers.wrappedBuffer(END)
+         buildStat("evictions", stats.getEvictions, sb),
+         buildStat("bytes_read", 0, sb), // TODO: Through netty?
+         buildStat("bytes_written", 0, sb), // TODO: Through netty?
+         buildStat("limit_maxbytes", 0, sb), // Unsupported
+         buildStat("threads", 0, sb), // TODO: Through netty?
+         buildStat("conn_yields", 0, sb), // Unsupported
+         wrappedBuffer(END)
       )
    }
 
-   private def buildStat(stat: String, value: Any, sb: StringBuilder, buffers: ChannelBuffers): ChannelBuffer = {
+   private def buildStat(stat: String, value: Any, sb: StringBuilder): ChannelBuffer = {
       sb.append("STAT").append(' ').append(stat).append(' ').append(value).append(CRLF)
-      val buffer = buffers.wrappedBuffer(sb.toString.getBytes)
+      val buffer = wrappedBuffer(sb.toString.getBytes)
       sb.setLength(0)
       buffer
    }
@@ -365,10 +364,9 @@
       new MemcachedValue(data, nextVersion, flags)
    }   
 
-   private def buildGetResponse(op: Enumeration#Value, buffers: ChannelBuffers,
-                                k: String, v: MemcachedValue): ChannelBuffer = {
+   private def buildGetResponse(op: Enumeration#Value, k: String, v: MemcachedValue): ChannelBuffer = {
       val header = buildGetResponseHeader(k, v, op)
-      buffers.wrappedBuffer(header.getBytes, v.data, CRLFBytes)
+      wrappedBuffer(header.getBytes, v.data, CRLFBytes)
    }
 
    private def buildGetResponseHeader(k: String, v: MemcachedValue, op: Enumeration#Value): String = {
@@ -384,8 +382,8 @@
 
 class MemcachedParameters(override val data: Array[Byte], override val lifespan: Int,
                           override val maxIdle: Int, override val streamVersion: Long,
-                          override val noReply: Boolean, val flags: Int, val delta: String,
-                          val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, streamVersion, noReply)
+                          val noReply: Boolean, val flags: Int, val delta: String,
+                          val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, streamVersion)
 
 private class DelayedFlushAll(cache: Cache[String, MemcachedValue],
                               flushFunction: AdvancedCache[String, MemcachedValue] => Unit) extends Runnable {



More information about the infinispan-commits mailing list