[infinispan-commits] Infinispan SVN: r1622 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport/netty and 5 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Mar 25 13:38:03 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-25 13:38:02 -0400 (Thu, 25 Mar 2010)
New Revision: 1622

Added:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
Removed:
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationResponse.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-171] (Build a server module based on the HotRod protocol) Further consolidation of code shared between memcached and hot rod. Implemented parameterless stats comamnd for hot rod.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -38,64 +38,18 @@
                val k = readKey(header, buffer)
                val params = readParameters(header, buffer)
                header.op match {
-                  case PutRequest => {
-                     putInCache(header, k, params.get, cache)
-                     sendResponse(header, ctx, None, None, params, None)
-                  }
-                  case PutIfAbsentRequest => {
-                     val prev = cache.get(k)
-                     if (prev == null) putIfAbsentInCache(header, k, params.get, cache) // Generate new version only if key not present
-                     sendResponse(header, ctx, None, None, params, Some(prev))
-                  }
-                  case ReplaceRequest => {
-                     val prev = cache.get(k)
-                     if (prev != null) replaceInCache(header, k, params.get, cache) // Generate new version only if key present
-                     sendResponse(header, ctx, None, None, params, Some(prev))
-                  }
-                  case ReplaceIfUnmodifiedRequest => {
-                     val prev = cache.get(k)
-                     if (prev != null) {
-                        if (prev.version == params.get.streamVersion) {
-                           // Generate new version only if key present and version has not changed, otherwise it's wasteful
-                           val v = createValue(header, params.get, generateVersion(cache))
-                           val replaced = cache.replace(k, prev, v);
-                           if (replaced)
-                              sendResponse(header, ctx, None, Some(v), params, Some(prev))
-                           else
-                              sendResponse(header, ctx, None, None, params, Some(prev))
-                        } else {
-                           sendResponse(header, ctx, None, None, params, Some(prev))
-                        }
-                     } else {
-                        sendResponse(header, ctx, None, None, params, None)
-                     }
-                  }
-                  case RemoveRequest => {
-                     val prev = cache.remove(k)
-                     sendResponse(header, ctx, None, None, params, Some(prev))
-                  }
+                  case PutRequest => put(header, k, params.get, cache)
+                  case PutIfAbsentRequest => putIfAbsent(header, k, params.get, cache)
+                  case ReplaceRequest => replace(header, k, params.get, cache)
+                  case ReplaceIfUnmodifiedRequest => replaceIfUmodified(header, k, params.get, cache)
+                  case RemoveRequest => remove(header, k, params, cache)
                }
             }
-            case GetRequest | GetWithVersionRequest => {
-               val keys = readKeys(header, buffer)
-               if (keys.length > 1) {
-                  val map = new HashMap[K,V]()
-                  for (k <- keys) {
-                     val v = cache.get(k)
-                     if (v != null)
-                        map += (k -> v)
-                  }
-                  sendMultiGetResponse(header, ctx, new immutable.HashMap ++ map)
-               } else {
-                  sendResponse(header, ctx, Some(keys.head), Some(cache.get(keys.head)), None, None)
-               }
-            }
-            case StatsRequest => sendResponse(header, ctx, cache.getAdvancedCache.getStats)
+            case GetRequest | GetWithVersionRequest => get(header, buffer, ctx.getChannelBuffers, cache)
+            case StatsRequest => createStatsResponse(header, ctx.getChannelBuffers, cache.getAdvancedCache.getStats)
             case _ => handleCustomRequest(header, ctx, buffer, cache)
          }
-         // TODO: to avoid checking for null, make all send* methods return something, even the memcached ones,
-         // they could send back a buffer whereas hotrod would send back pojos.
-         if (ret != null) ctx.getChannel.write(ret)
+         writeResponse(ctx.getChannel, ctx.getChannelBuffers, ret)
          null
       } catch {
          case se: ServerException => throw se
@@ -104,83 +58,155 @@
       }
    }
 
-   private def putInCache(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+   private def writeResponse(ch: Channel, buffers: ChannelBuffers, response: AnyRef) {
+      if (response != null) {
+         response match {
+            case l: List[ChannelBuffer] => l.foreach(ch.write(_))
+            case a: Array[Byte] => ch.write(buffers.wrappedBuffer(a))
+            case sb: StringBuilder => ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+            case s: String => ch.write(buffers.wrappedBuffer(s.getBytes))
+            case _ => ch.write(response)
+         }
+      }
+   }
+
+   private def put(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
       val v = createValue(header, params, generateVersion(cache))
       cache.put(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+      if (!params.noReply) createSuccessResponse(header) else null
    }
 
-   private def putIfAbsentInCache(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
-      val v = createValue(header, params, generateVersion(cache))
-      cache.putIfAbsent(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+   private def putIfAbsent(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+      val prev = cache.get(k)
+      if (prev == null) { // Generate new version only if key not present      
+         val v = createValue(header, params, generateVersion(cache))
+         cache.putIfAbsent(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+      }
+      if (!params.noReply && prev == null)
+         createSuccessResponse(header)
+      else if (!params.noReply && prev != null)
+         createNotExecutedResponse(header)
+      else
+         null
    }
 
-   private def replaceInCache(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
-      val v = createValue(header, params, generateVersion(cache))
-      cache.replace(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+   private def replace(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+      val prev = cache.get(k)
+      if (prev != null) { // Generate new version only if key present
+         val v = createValue(header, params, generateVersion(cache))
+         cache.replace(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+      }
+      if (!params.noReply && prev != null)
+         createSuccessResponse(header)
+      else if (!params.noReply && prev == null)
+         createNotExecutedResponse(header)
+      else
+         null
    }
 
-   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
-      error("Exception reported", e.getCause)
-      sendResponse(ctx, e.getCause)
+   private def replaceIfUmodified(header: SuitableHeader, k: K, params: SuitableParameters, cache: Cache[K, V]): AnyRef = {
+      val prev = cache.get(k)
+      if (prev != null) {
+         if (prev.version == params.streamVersion) {
+            // Generate new version only if key present and version has not changed, otherwise it's wasteful
+            val v = createValue(header, params, generateVersion(cache))
+            val replaced = cache.replace(k, prev, v);
+            if (!params.noReply && replaced)
+               createSuccessResponse(header)
+            else if (!params.noReply)
+               createNotExecutedResponse(header)
+            else
+               null
+         } else if (!params.noReply) {
+            createNotExecutedResponse(header)
+         } else {
+            null
+         }
+      } else if(!params.noReply) {
+         createNotExistResponse(header)
+      } else {
+         null
+      }
    }
 
-   def readHeader(buffer: ChannelBuffer): SuitableHeader
+   private def remove(header: SuitableHeader, k: K, params: Option[SuitableParameters], cache: Cache[K, V]): AnyRef = {
+      val prev = cache.remove(k)
+      if ((params == None || !params.get.noReply) && prev != null)
+         createSuccessResponse(header)
+      else if ((params == None || !params.get.noReply) && prev == null)
+         createNotExistResponse(header)
+      else
+         null
+   }
 
-   def getCache(header: SuitableHeader): Cache[K, V]
+   private def get(header: SuitableHeader, buffer: ChannelBuffer, buffers: ChannelBuffers, cache: Cache[K, V]): AnyRef = {
+      val keys = readKeys(header, buffer)
+      if (keys.length > 1) {
+         val map = new HashMap[K,V]()
+         for (k <- keys) {
+            val v = cache.get(k)
+            if (v != null)
+               map += (k -> v)
+         }
+         createMultiGetResponse(header, buffers, new immutable.HashMap ++ map)
+      } else {
+         createGetResponse(header, buffers, keys.head, cache.get(keys.head))
+      }
+   }
 
-   // todo: probably remove in favour of readKeys
-   def readKey(header: SuitableHeader, buffer: ChannelBuffer): K
+   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+      error("Exception reported", e.getCause)
+      val ch = ctx.getChannel
+      val buffers = ctx.getChannelBuffers
+      val errorResponse = createErrorResponse(e.getCause)
+      if (errorResponse != null) {
+         errorResponse match {
+            case a: Array[Byte] => ch.write(buffers.wrappedBuffer(a))
+            case sb: StringBuilder => ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+            case _ => ch.write(errorResponse)
+         }
+      }
+   }
 
-   def readKeys(header: SuitableHeader, buffer: ChannelBuffer): Array[K]
+   protected def readHeader(buffer: ChannelBuffer): SuitableHeader
 
-   def readParameters(header: SuitableHeader, buffer: ChannelBuffer): Option[SuitableParameters]
+   protected def getCache(header: SuitableHeader): Cache[K, V]
 
-   def createValue(header: SuitableHeader, params: SuitableParameters, nextVersion: Long): V 
+   protected def readKey(header: SuitableHeader, buffer: ChannelBuffer): K
 
-   def sendResponse(header: SuitableHeader, ctx: ChannelHandlerContext, k: Option[K], v: Option[V],
-                    params: Option[SuitableParameters], prev: Option[V]): AnyRef = {
-      val buffers = ctx.getChannelBuffers
-      val ch = ctx.getChannel
-      if (params == None || !params.get.noReply) {
-         // TODO consolidate this event further since both hotrod and memcached end up writing to a channel something,
-         // this methods here could just simply create the responses and let the common framework write them
-         val ret = header.op match {
-            case PutRequest => sendPutResponse(header, ch, buffers)
-            case GetRequest | GetWithVersionRequest => sendGetResponse(header, ch, buffers, k.get, v.get)
-            case PutIfAbsentRequest => sendPutIfAbsentResponse(header, ch, buffers, prev.get)
-            case ReplaceRequest => sendReplaceResponse(header, ch, buffers, prev.get)
-            case ReplaceIfUnmodifiedRequest => sendReplaceIfUnmodifiedResponse(header, ch, buffers, v, prev)
-            case RemoveRequest => sendRemoveResponse(header, ch, buffers, prev.get)
-//            case _ => sendCustomResponse(header, ch, buffers, v, prev)
-         }
-         ret
-      } else null
-   }
+   protected def readKeys(header: SuitableHeader, buffer: ChannelBuffer): Array[K]
 
-   def sendPutResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers): AnyRef
+   protected def readParameters(header: SuitableHeader, buffer: ChannelBuffer): Option[SuitableParameters]
 
-   def sendGetResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers, k: K, v: V): AnyRef
+   protected def createValue(header: SuitableHeader, params: SuitableParameters, nextVersion: Long): V
 
-   def sendPutIfAbsentResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers, prev: V): AnyRef
+   protected def createSuccessResponse(header: SuitableHeader): AnyRef
 
-   def sendReplaceResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers, prev: V): AnyRef
+   protected def createNotExecutedResponse(header: SuitableHeader): AnyRef
 
-   def sendReplaceIfUnmodifiedResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers,
-                                       v: Option[V], prev: Option[V]): AnyRef
+   protected def createNotExistResponse(header: SuitableHeader): AnyRef
 
-   def sendRemoveResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers, prev: V): AnyRef
+   protected def createGetResponse(header: SuitableHeader, buffers: ChannelBuffers, k: K, v: V): AnyRef
 
-   def sendMultiGetResponse(header: SuitableHeader, ctx: ChannelHandlerContext, pairs: Map[K, V]): AnyRef
+   protected def createMultiGetResponse(header: SuitableHeader, buffers: ChannelBuffers, pairs: Map[K, V]): AnyRef
    
-//   def sendCustomResponse(header: SuitableHeader, ch: Channel, buffers: ChannelBuffers, v: Option[V], prev: Option[V]): AnyRef
+   protected def createErrorResponse(t: Throwable): AnyRef
 
-   def sendResponse(ctx: ChannelHandlerContext, t: Throwable): AnyRef
+   protected def createStatsResponse(header: SuitableHeader, buffers: ChannelBuffers, stats: Stats): AnyRef
 
-   def sendResponse(header: SuitableHeader, ctx: ChannelHandlerContext, stats: Stats): AnyRef
-
-   def handleCustomRequest(header: SuitableHeader, ctx: ChannelHandlerContext,
+   protected def handleCustomRequest(header: SuitableHeader, ctx: ChannelHandlerContext,
                            buffer: ChannelBuffer, cache: Cache[K, V]): AnyRef
 
+   protected def generateVersion(cache: Cache[K, V]): Long = {
+      val rpcManager = cache.getAdvancedCache.getRpcManager
+      if (rpcManager != null) {
+         val transport = rpcManager.getTransport
+         newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
+      } else {
+         newVersion(None, None, 0)
+      }
+   }
+
    /**
     * Transforms lifespan pass as seconds into milliseconds
     * following this rule:
@@ -198,16 +224,6 @@
       else TimeUnit.SECONDS.toMillis(lifespan)
    }
 
-   protected def generateVersion(cache: Cache[K, V]): Long = {
-      val rpcManager = cache.getAdvancedCache.getRpcManager
-      if (rpcManager != null) {
-         val transport = rpcManager.getTransport
-         newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
-      } else {
-         newVersion(None, None, 0)
-      }
-   }
-
 }
 
 object AbstractProtocolDecoder extends Logging {
@@ -217,6 +233,7 @@
 
 class RequestHeader(val op: Enumeration#Value)
 
+// TODO: NoReply could possibly be passed to subclass specific to memcached and make create* implementations use it
 class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val streamVersion: Long, val noReply: Boolean)
 
 class UnknownOperationException(reason: String) extends StreamCorruptedException(reason)

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -12,20 +12,6 @@
 
 object ChannelBuffersAdapter extends ChannelBuffers {
 
-//   override def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer = {
-////      val nettyBuffers = new Array[NettyChannelBuffer](buffers.length)
-////      val nettyBuffers = buffers.map(_.getUnderlyingChannelBuffer) : _*
-////      for (buffer <- buffers) {
-////         nettyBuffers + buffer.getUnderlyingChannelBuffer
-////      }
-//      val nettyBuffer = buffer.getUnderlyingChannelBuffer.asInstanceOf[NettyChannelBuffer]
-//      new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(nettyBuffer))
-//   }
-
-//   override def wrappedBuffer(array: Array[Byte]): ChannelBuffer = {
-//      new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array));
-//   }
-
    override def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
       new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array : _*));
    }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -13,6 +13,7 @@
    
    override def getChannel: Channel = new ChannelAdapter(ctx.getChannel)
 
+   // TODO: Remove this from here and make it available via an object, this would clean up unnecessary params 
    override def getChannelBuffers: ChannelBuffers = ChannelBuffersAdapter
    
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -65,14 +65,6 @@
       var future = serverChannels.unbind().awaitUninterruptibly();
       if (!future.isCompleteSuccess()) {
          warn("Server channel group did not completely unbind");
-//         val iter = future.getGroup().iterator
-//         while (iter.hasNext) {
-//            val ch = iter.next
-//            if (ch.isBound()) {
-//               warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
-//            }
-//         }
-
          for (ch <- asIterator(future.getGroup().iterator)) {
             if (ch.isBound()) {
                warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
@@ -80,34 +72,17 @@
          }
       }
 
-      // TODO remove workaround when inteating Netty 3.2.x - https://jira.jboss.org/jira/browse/NETTY-256
-      masterExecutor.shutdown();
-      try {
-         masterExecutor.awaitTermination(30, TimeUnit.SECONDS);
-      } catch {
-         case ie: InterruptedException => ie.printStackTrace();
-      }
-
       workerExecutor.shutdown();
       serverChannels.close().awaitUninterruptibly();
       future = acceptedChannels.close().awaitUninterruptibly();
       if (!future.isCompleteSuccess()) {
          warn("Channel group did not completely close");
-//         val iter = future.getGroup().iterator
-//         while (iter.hasNext) {
-//            val ch = iter.next
-//            if (ch.isBound()) {
-//               warn(ch + " is still connected to " + ch.getRemoteAddress());
-//            }
-//         }
-         
          for (ch <- asIterator(future.getGroup().iterator)) {
             if (ch.isBound()) {
                warn(ch + " is still connected to " + ch.getRemoteAddress());
             }
          }
       }
-//      debug("Channel group completely closed");
       debug("Channel group completely closed, release external resources");
       factory.releaseExternalResources();
    }

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/AbstractVersionedDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -2,7 +2,7 @@
 
 import org.infinispan.server.core.RequestParameters
 import org.infinispan.server.core.CacheValue
-import org.infinispan.server.core.transport.{ChannelBuffers, Channel, ChannelBuffer}
+import org.infinispan.server.core.transport.{ChannelBuffer}
 import org.infinispan.Cache
 import org.infinispan.stats.Stats
 
@@ -23,20 +23,16 @@
 
    def createValue(params: RequestParameters, nextVersion: Long): CacheValue
 
-   def sendPutResponse(messageId: Long): AnyRef
+   def createSuccessResponse(header: HotRodHeader): AnyRef
 
-   def sendGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef
+   def createNotExecutedResponse(header: HotRodHeader): AnyRef
 
-   def sendPutIfAbsentResponse(messageId: Long, prev: CacheValue): AnyRef
+   def createNotExistResponse(header: HotRodHeader): AnyRef
 
-   def sendReplaceResponse(messageId: Long, prev: CacheValue): AnyRef
+   def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef
 
-   def sendReplaceIfUnmodifiedResponse(messageId: Long, v: Option[CacheValue], prev: Option[CacheValue]): AnyRef
-
-   def sendRemoveResponse(messageId: Long, prev: CacheValue): AnyRef
-
    def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef
 
-   def sendStatsResponse(header: HotRodHeader, stats: Stats): AnyRef
+   def createStatsResponse(header: HotRodHeader, stats: Stats): AnyRef
 
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/Decoder10.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -2,15 +2,14 @@
 
 import org.infinispan.server.core.Operation._
 import HotRodOperation._
-import OperationResponse._
 import OperationStatus._
 import org.infinispan.manager.CacheManager
 import org.infinispan.server.core.transport.{ChannelBuffer}
-import org.infinispan.server.core.{UnknownOperationException, RequestParameters, Logging, CacheValue}
 import org.infinispan.Cache
+import org.infinispan.stats.Stats
+import org.infinispan.server.core._
 import collection.mutable
 import collection.immutable
-import org.infinispan.stats.Stats
 
 /**
  * // TODO: Document this
@@ -19,12 +18,14 @@
  */
 
 class Decoder10(cacheManager: CacheManager) extends AbstractVersionedDecoder {
-
+   import RequestResolver._
+   import ResponseResolver._
+   import OperationResponse._
    type SuitableHeader = HotRodHeader
 
    override def readHeader(buffer: ChannelBuffer, messageId: Long): HotRodHeader = {
       val streamOp = buffer.readUnsignedByte
-      val op = OperationResolver.resolve(streamOp)
+      val op = toRequest(streamOp)
       if (op == None) {
          throw new UnknownOperationException("Unknown operation: " + streamOp);
       }
@@ -60,11 +61,19 @@
       }
    }
 
-   override def createValue(params: RequestParameters, nextVersion: Long): CacheValue = new CacheValue(params.data, nextVersion)
+   override def createValue(params: RequestParameters, nextVersion: Long): CacheValue =
+      new CacheValue(params.data, nextVersion)
 
-   override def sendPutResponse(messageId: Long): AnyRef = new Response(messageId, PutResponse, Success)
+   override def createSuccessResponse(header: HotRodHeader): AnyRef =
+      new Response(header.messageId, toResponse(header.op), Success)
 
-   override def sendGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
+   override def createNotExecutedResponse(header: HotRodHeader): AnyRef =
+      new Response(header.messageId, toResponse(header.op), OperationNotExecuted)
+
+   override def createNotExistResponse(header: HotRodHeader): AnyRef =
+      new Response(header.messageId, toResponse(header.op), KeyDoesNotExist)   
+
+   override def createGetResponse(messageId: Long, v: CacheValue, op: Enumeration#Value): AnyRef = {
       if (v != null && op == GetRequest)
          new GetResponse(messageId, GetResponse, Success, Some(v.data))
       else if (v != null && op == GetWithVersionRequest)
@@ -75,37 +84,6 @@
          new GetWithVersionResponse(messageId, GetWithVersionResponse, KeyDoesNotExist, None, 0)
    }
 
-   override def sendPutIfAbsentResponse(messageId: Long, prev: CacheValue): AnyRef = {
-      if (prev == null)
-         new Response(messageId, PutIfAbsentResponse, Success)
-      else
-         new Response(messageId, PutIfAbsentResponse, OperationNotExecuted)
-   }
-
-   def sendReplaceResponse(messageId: Long, prev: CacheValue): AnyRef = {
-      if (prev != null)
-         new Response(messageId, ReplaceResponse, Success)
-      else
-         new Response(messageId, ReplaceResponse, OperationNotExecuted)
-   }
-
-   override def sendReplaceIfUnmodifiedResponse(messageId: Long, v: Option[CacheValue],
-                                                prev: Option[CacheValue]): AnyRef = {
-      if (v != None && prev != None)
-         new Response(messageId, ReplaceIfUnmodifiedResponse, Success)
-      else if (v == None && prev != None)
-         new Response(messageId, ReplaceIfUnmodifiedResponse, OperationNotExecuted)
-      else
-         new Response(messageId, ReplaceIfUnmodifiedResponse, KeyDoesNotExist)
-   }
-
-   override def sendRemoveResponse(messageId: Long, prev: CacheValue): AnyRef = {
-      if (prev != null)
-         new Response(messageId, ReplaceResponse, Success)
-      else
-         new Response(messageId, ReplaceResponse, KeyDoesNotExist)
-   }
-
    override def handleCustomRequest(header: HotRodHeader, buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef = {
       val messageId = header.messageId
       header.op match {
@@ -142,26 +120,25 @@
       }
    }
 
-   override def sendStatsResponse(header: HotRodHeader, stats: Stats): AnyRef = {
-      null
-//      val cacheStats = cache.getAdvancedCache.getStats
-//      val stats = mutable.Map[String, String]
-//      stats += ("timeSinceStart", cacheStats.getTimeSinceStart)
-//      stats += ("currentNumberOfEntries", cacheStats.getCurrentNumberOfEntries)
-//      stats += ("totalNumberOfEntries", cacheStats.getTotalNumberOfEntries)
-//      stats += ("stores", cacheStats.getStores)
-//      stats += ("retrievals", cacheStats.getRetrievals)
-//      stats += ("hits", cacheStats.getHits)
-//      stats += ("misses", cacheStats.getMisses)
-//      stats += ("removeHits", cacheStats.getRemoveHits)
-//      stats += ("removeMisses", cacheStats.getRemoveMisses)
-//      stats += ("evictions", cacheStats.getEvictions)
-//      new StatsResponse(header.messageId, immutable.Map ++ stats)
+   override def createStatsResponse(header: HotRodHeader, cacheStats: Stats): AnyRef = {
+      val stats = mutable.Map.empty[String, String]
+      stats += ("timeSinceStart" -> cacheStats.getTimeSinceStart.toString)
+      stats += ("currentNumberOfEntries" -> cacheStats.getCurrentNumberOfEntries.toString)
+      stats += ("totalNumberOfEntries" -> cacheStats.getTotalNumberOfEntries.toString)
+      stats += ("stores" -> cacheStats.getStores.toString)
+      stats += ("retrievals" -> cacheStats.getRetrievals.toString)
+      stats += ("hits" -> cacheStats.getHits.toString)
+      stats += ("misses" -> cacheStats.getMisses.toString)
+      stats += ("removeHits" -> cacheStats.getRemoveHits.toString)
+      stats += ("removeMisses" -> cacheStats.getRemoveMisses.toString)
+      stats += ("evictions" -> cacheStats.getEvictions.toString)
+      new StatsResponse(header.messageId, immutable.Map[String, String]() ++ stats)
    }
+
 }
 
-object OperationResolver extends Logging {
-   private val operations = Map[Int, Enumeration#Value](
+object RequestResolver extends Logging {
+   private val requests = Map[Int, Enumeration#Value](
       0x01 -> PutRequest,
       0x03 -> GetRequest,
       0x05 -> PutIfAbsentRequest,
@@ -176,8 +153,8 @@
       0x17 -> PingRequest 
    )
 
-   def resolve(streamOp: Short): Option[Enumeration#Value] = {
-      val op = operations.get(streamOp)
+   def toRequest(streamOp: Short): Option[Enumeration#Value] = {
+      val op = requests.get(streamOp)
       if (op == None)
          trace("Operation code: {0} was unmatched", streamOp)
       else
@@ -185,4 +162,43 @@
       op
    }
 
+}
+
+object OperationResponse extends Enumeration {
+   type OperationResponse = Enumeration#Value
+   val PutResponse = Value(0x02)
+   val GetResponse = Value(0x04)
+   val PutIfAbsentResponse = Value(0x06)
+   val ReplaceResponse = Value(0x08)
+   val ReplaceIfUnmodifiedResponse = Value(0x0A)
+   val RemoveResponse = Value(0x0C)
+   val RemoveIfUnmodifiedResponse = Value(0x0E)
+   val ContainsKeyResponse = Value(0x10)
+   val GetWithVersionResponse = Value(0x12)
+   val ClearResponse = Value(0x14)
+   val StatsResponse = Value(0x16)
+   val PingResponse = Value(0x18)
+   val ErrorResponse = Value(0x50)
+}
+
+object ResponseResolver {
+   import OperationResponse._
+   private val responses = Map[Enumeration#Value, OperationResponse](
+      PutRequest -> PutResponse,
+      GetRequest -> GetResponse,
+      PutIfAbsentRequest -> PutIfAbsentResponse,
+      ReplaceRequest -> ReplaceResponse,
+      ReplaceIfUnmodifiedRequest -> ReplaceIfUnmodifiedResponse,
+      RemoveRequest -> RemoveResponse,
+      RemoveIfUnmodifiedRequest -> RemoveIfUnmodifiedResponse,
+      ContainsKeyRequest -> ContainsKeyResponse,
+      GetWithVersionRequest -> GetWithVersionResponse,
+      ClearRequest -> ClearResponse,
+      StatsRequest -> StatsResponse,
+      PingRequest -> PingResponse
+   )
+
+   def toResponse(request: Enumeration#Value): OperationResponse = {
+      responses.get(request).get
+   }
 }
\ No newline at end of file

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -4,10 +4,8 @@
 import org.infinispan.stats.Stats
 import java.io.StreamCorruptedException
 import org.infinispan.server.core._
-import OperationStatus._
-import HotRodOperation._
-import ProtocolFlag._
 import transport._
+import OperationStatus._
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
 
 /**
@@ -22,6 +20,7 @@
    type SuitableHeader = HotRodHeader
    type SuitableParameters = RequestParameters
 
+   // TODO: Ask trustin whether this needs to be a volatile or not, depends on how decoders are shared
    @volatile private var isError = false
 
    override def readHeader(buffer: ChannelBuffer): HotRodHeader = {
@@ -78,43 +77,28 @@
    override def createValue(header: HotRodHeader, params: RequestParameters, nextVersion: Long): CacheValue =
       header.decoder.createValue(params, nextVersion)
 
-   override def sendPutResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers): AnyRef =
-      header.decoder.sendPutResponse(header.messageId)
+   override def createSuccessResponse(header: HotRodHeader): AnyRef = header.decoder.createSuccessResponse(header)
 
-   override def sendGetResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers,
-                                k: CacheKey, v: CacheValue): AnyRef =
-      header.decoder.sendGetResponse(header.messageId, v, header.op)
+   override def createNotExecutedResponse(header: HotRodHeader): AnyRef = header.decoder.createNotExecutedResponse(header)
 
-   override def sendPutIfAbsentResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers,
-                                        prev: CacheValue): AnyRef =
-      header.decoder.sendPutIfAbsentResponse(header.messageId, prev)
+   override def createNotExistResponse(header: HotRodHeader): AnyRef = header.decoder.createNotExistResponse(header)
 
-   override def sendReplaceResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers,
-                                    prev: CacheValue): AnyRef =
-      header.decoder.sendReplaceResponse(header.messageId, prev)
+   override def createGetResponse(header: HotRodHeader, buffers: ChannelBuffers,
+                                k: CacheKey, v: CacheValue): AnyRef =
+      header.decoder.createGetResponse(header.messageId, v, header.op)
 
-   override def sendReplaceIfUnmodifiedResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers,
-                                                v: Option[CacheValue], prev: Option[CacheValue]): AnyRef =
-      header.decoder.sendReplaceIfUnmodifiedResponse(header.messageId, v, prev)
+   override def createMultiGetResponse(header: HotRodHeader, buffers: ChannelBuffers,
+                                       pairs: Map[CacheKey, CacheValue]): AnyRef = null // Unsupported
 
-   override def sendRemoveResponse(header: HotRodHeader, ch: Channel, buffers: ChannelBuffers,
-                                   prev: CacheValue): AnyRef =
-      header.decoder.sendRemoveResponse(header.messageId, prev)
-
-   override def sendMultiGetResponse(header: HotRodHeader, ctx: ChannelHandlerContext,
-                                     pairs: Map[CacheKey, CacheValue]): AnyRef = null // Unsupported
-
    override def handleCustomRequest(header: HotRodHeader, ctx: ChannelHandlerContext,
                                     buffer: ChannelBuffer, cache: Cache[CacheKey, CacheValue]): AnyRef =
       header.decoder.handleCustomRequest(header, buffer, cache)
 
-   override def sendResponse(header: HotRodHeader, ctx: ChannelHandlerContext, stats: Stats): AnyRef =
-      header.decoder.sendStatsResponse(header, stats)
+   override def createStatsResponse(header: HotRodHeader, buffers: ChannelBuffers, stats: Stats): AnyRef =
+      header.decoder.createStatsResponse(header, stats)
 
-   override def sendResponse(ctx: ChannelHandlerContext, t: Throwable): AnyRef = {
-      val ch = ctx.getChannel
-      val buffers = ctx.getChannelBuffers
-      val errorResponse = t match {
+   override def createErrorResponse(t: Throwable): AnyRef = {
+      t match {
          case se: ServerException => {
             val messageId = se.header.asInstanceOf[HotRodHeader].messageId
             se.getCause match {
@@ -122,12 +106,10 @@
                case uoe: UnknownOperationException => new ErrorResponse(messageId, UnknownOperation, uoe.toString)
                case uve: UnknownVersionException => new ErrorResponse(messageId, UnknownVersion, uve.toString)
                // TODO add more cases
-               case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)  
+               case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
             }
          }
       }
-      ch.write(errorResponse)
-      null
    }
 
    override def start {}

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodEncoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -16,15 +16,17 @@
    override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
       trace("Encode msg {0}", msg)
       val buffer: ChannelBuffer = msg match {
+         case s: StatsResponse => {
+            val buffer = ctx.getChannelBuffers.dynamicBuffer
+            writeHeader(buffer, s)
+            buffer.writeUnsignedInt(s.stats.size)
+            for ((key, value) <- s.stats) {
+               buffer.writeString(key)
+               buffer.writeString(value)
+            }
+            buffer
+         }
          case r: Response => writeHeader(ctx.getChannelBuffers.dynamicBuffer, r)
-//         case s: StatsResponse => {
-//            val buffer = ctx.getChannelBuffers.dynamicBuffer
-//            for ((key, value) <- s.stats) {
-//               writeHeader(buffer, s)
-//               buffer.writeString(key)
-//               buffer.writeString(value)
-//            }
-//         }
       }
       msg match {
          case g: GetWithVersionResponse => {

Deleted: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationResponse.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationResponse.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/OperationResponse.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -1,25 +0,0 @@
-package org.infinispan.server.hotrod
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since
- */
-
-object OperationResponse extends Enumeration {
-   type OperationResponse = Value
-
-   val PutResponse = Value(0x02)
-   val GetResponse = Value(0x04)
-   val PutIfAbsentResponse = Value(0x06)
-   val ReplaceResponse = Value(0x08)
-   val ReplaceIfUnmodifiedResponse = Value(0x0A)
-   val RemoveResponse = Value(0x0C)
-   val RemoveIfUnmodifiedResponse = Value(0x0E)
-   val ContainsKeyResponse = Value(0x10)
-   val GetWithVersionResponse = Value(0x12)
-   val ClearResponse = Value(0x14)
-   val StatsResponse = Value(0x16)
-   val PingResponse = Value(0x18)
-   val ErrorResponse = Value(0x50)
-}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -8,8 +8,7 @@
 import java.util.Arrays
 import org.jboss.netty.channel.Channel
 import org.infinispan.manager.{DefaultCacheManager, CacheManager}
-import org.infinispan.context.Flag
-import org.infinispan.{AdvancedCache, Cache => InfinispanCache}
+import org.infinispan.{AdvancedCache}
 import org.infinispan.test.{SingleCacheManagerTest}
 import org.infinispan.server.core.CacheValue
 import org.infinispan.server.hotrod.OperationStatus._
@@ -27,7 +26,7 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
- at Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
+ at Test(groups = Array("functional"), testName = "server.hotrod.HotRodFunctionalTest")
 class HotRodFunctionalTest extends SingleCacheManagerTest with Utils with Client {
    private val cacheName = "hotrod-cache"
    private var server: HotRodServer = _
@@ -74,6 +73,11 @@
       doPutWithLifespanMaxIdle(m, 0, 0)
    }
 
+   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
+      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
+      assertStatus(status, Success)
+   }
+
    def testPutOnDefaultCache(m: Method) {
       val status = put(ch, DefaultCacheManager.DEFAULT_CACHE_NAME, k(m) , 0, 0, v(m))
       assertStatus(status, Success)
@@ -96,11 +100,6 @@
       assertKeyDoesNotExist(getSt, actual)
    }
 
-   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
-      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
-      assertStatus(status, Success)
-   }
-
    def testGetBasic(m: Method) {
       doPut(m)
       val (getSt, actual) = doGet(m)
@@ -301,6 +300,20 @@
       }
    }
 
+   def testStatsDisabled(m: Method) {
+      val s = stats(ch, cacheName)
+      assertEquals(s.get("timeSinceStart").get, "-1")
+      assertEquals(s.get("currentNumberOfEntries").get, "-1")
+      assertEquals(s.get("totalNumberOfEntries").get, "-1")
+      assertEquals(s.get("stores").get, "-1")
+      assertEquals(s.get("retrievals").get, "-1")
+      assertEquals(s.get("hits").get, "-1")
+      assertEquals(s.get("misses").get, "-1")
+      assertEquals(s.get("removeHits").get, "-1")
+      assertEquals(s.get("removeMisses").get, "-1")
+      assertEquals(s.get("evictions").get, "-1")
+   }
+
    def testPing(m: Method) {
       val status = ping(ch, cacheName)
       assertStatus(status, Success)

Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodStatsTest.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -0,0 +1,88 @@
+package org.infinispan.server.hotrod
+
+import test.{Utils, Client}
+import org.infinispan.test.SingleCacheManagerTest
+import org.testng.annotations.{AfterClass, Test}
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import org.infinispan.server.core.CacheValue
+import org.infinispan.AdvancedCache
+import org.jboss.netty.channel.Channel
+import java.lang.reflect.Method
+import org.testng.Assert._
+import org.infinispan.server.hotrod.OperationStatus._
+import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.FunctionalTest")
+class HotRodStatsTest extends SingleCacheManagerTest with Utils with Client {
+   private val cacheName = "hotrod-cache"
+   private var server: HotRodServer = _
+   private var ch: Channel = _
+   private var advancedCache: AdvancedCache[CacheKey, CacheValue] = _
+   private var jmxDomain = classOf[HotRodStatsTest].getSimpleName
+
+   override def createCacheManager: CacheManager = {
+      val cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
+      advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
+      server = startHotRodServer(cacheManager)
+      ch = connect("127.0.0.1", server.getPort)
+      cacheManager
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      super.destroyAfterClass
+      log.debug("Test finished, close client and Hot Rod server", null)
+      ch.disconnect
+      server.stop
+   }
+
+   def testStats(m: Method) {
+      var s = stats(ch, cacheName)
+      assertTrue(s.get("timeSinceStart") != 0)
+      assertEquals(s.get("currentNumberOfEntries").get, "0")
+      assertEquals(s.get("totalNumberOfEntries").get, "0")
+      assertEquals(s.get("stores").get, "0")
+      assertEquals(s.get("retrievals").get, "0")
+      assertEquals(s.get("hits").get, "0")
+      assertEquals(s.get("misses").get, "0")
+      assertEquals(s.get("removeHits").get, "0")
+      assertEquals(s.get("removeMisses").get, "0")
+      assertEquals(s.get("evictions").get, "0")
+
+      doPut(m)
+      s = stats(ch, cacheName)
+      assertEquals(s.get("currentNumberOfEntries").get, "1")
+      assertEquals(s.get("totalNumberOfEntries").get, "1")
+      assertEquals(s.get("stores").get, "1")
+      val (getSt, actual) = doGet(m)
+      assertSuccess(getSt, v(m), actual)
+      s = stats(ch, cacheName)
+      assertEquals(s.get("hits").get, "1")
+      assertEquals(s.get("misses").get, "0")
+      assertEquals(s.get("retrievals").get, "1")
+   }
+
+   // TODO: shared this private between tests by making client trait and object instead
+   private def doPut(m: Method) {
+      doPutWithLifespanMaxIdle(m, 0, 0)
+   }
+
+   private def doPutWithLifespanMaxIdle(m: Method, lifespan: Int, maxIdle: Int) {
+      val status = put(ch, cacheName, k(m) , lifespan, maxIdle, v(m))
+      assertStatus(status, Success)
+   }
+
+   private def doGet(m: Method): (OperationStatus, Array[Byte]) = {
+      doGet(m, 0)
+   }
+
+   private def doGet(m: Method, flags: Int): (OperationStatus, Array[Byte]) = {
+      get(ch, cacheName, k(m), flags)
+   }
+   
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/Client.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -19,6 +19,8 @@
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, Executors}
 import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
 import org.infinispan.server.core.Logging
+import collection.mutable
+import collection.immutable
 
 /**
  * // TODO: Document this
@@ -125,25 +127,22 @@
 
    def clear(ch: Channel, name: String): OperationStatus = {
       put(ch, 0xA0, 0x13, name, null, 0, 0, null, 0, 0)
-//      val writeFuture = ch.write(new Op(0xA0, , name, null, 0, 0, null, 0, 0))
-//      writeFuture.awaitUninterruptibly
-//      assertTrue(writeFuture.isSuccess)
-//      // Get the handler instance to retrieve the answer.
-//      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-//      handler.getResponse.status
    }
 
+   def stats(ch: Channel, name: String): Map[String, String] = {
+      val writeFuture = ch.write(new Op(0xA0, 0x15, name, null, 0, 0, null, 0, 0))
+      writeFuture.awaitUninterruptibly
+      assertTrue(writeFuture.isSuccess)
+      // Get the handler instance to retrieve the answer.
+      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
+      val resp = handler.getResponse.asInstanceOf[StatsResponse]
+      resp.stats
+   }
+
    def ping(ch: Channel, name: String): OperationStatus = {
       put(ch, 0xA0, 0x17, name, null, 0, 0, null, 0, 0)
-//      val writeFuture = ch.write(new Op(0xA0, 0x13, name, null, 0, 0, null, 0, 0))
-//      writeFuture.awaitUninterruptibly
-//      assertTrue(writeFuture.isSuccess)
-//      // Get the handler instance to retrieve the answer.
-//      var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
-//      handler.getResponse.status
    }
 
-
    def assertStatus(status: OperationStatus, expected: OperationStatus): Boolean = {
       val isSuccess = status == expected
       assertTrue(isSuccess, "Status should have been '" + expected + "' but instead was: " + status)
@@ -196,7 +195,7 @@
                buffer.writeUnsignedInt(op.flags) // flags
                buffer.writeByte(0) // client intelligence
                buffer.writeUnsignedInt(0) // topology id
-               if (op.code != 0x13 && op.code != 0x17) { // if it's a key based op... 
+               if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op... 
                   buffer.writeRangedBytes(op.key) // key length + key
                   if (op.value != null) {
                      buffer.writeUnsignedInt(op.lifespan) // lifespan
@@ -227,9 +226,14 @@
       val topologyChangeMarker = buf.readUnsignedByte
       val resp: Response =
          opCode match {
-//            case StatsResponse => {
-//               // TODO!!! Wait for outcome of mail
-//            }
+            case StatsResponse => {
+               val size = buf.readUnsignedInt
+               val stats = mutable.Map.empty[String, String]
+               for (i <- 1 to size) {
+                  stats += (buf.readString -> buf.readString)
+               }
+               new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+            }
             case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
                  | RemoveResponse | RemoveIfUnmodifiedResponse | ContainsKeyResponse | ClearResponse | PingResponse =>
                new Response(id, opCode, status)

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -12,6 +12,7 @@
 import org.infinispan.server.core.transport.{Channel, ChannelBuffers, ChannelHandlerContext, ChannelBuffer}
 import org.infinispan.server.core._
 import org.infinispan.{AdvancedCache, Version, CacheException, Cache}
+import collection.mutable.ListBuffer
 
 /**
  * // TODO: Document this
@@ -20,7 +21,7 @@
  */
 
 class MemcachedDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
-   import MemcachedDecoder._
+   import RequestResolver._
 
    type SuitableParameters = MemcachedParameters
    type SuitableHeader = RequestHeader
@@ -38,20 +39,7 @@
 
    override def readHeader(buffer: ChannelBuffer): RequestHeader = {
       val streamOp = readElement(buffer)
-//      val op = {
-//         try {
-//            MemcachedOperation.withName(streamOp)
-//         }
-//         catch {
-//            case nsee: NoSuchElementException => {
-//               val line = readLine(buffer) // Read rest of line to clear the operation
-//               throw new UnknownOperationException("Unknown operation: " + streamOp);
-//            }
-//         }
-//      }
-//      val op = MemcachedOperation.withName(streamOp)
-
-      val op = OperationResolver.resolve(streamOp)
+      val op = toRequest(streamOp)
       if (op == None) {
          val line = readLine(buffer) // Read rest of line to clear the operation
          throw new UnknownOperationException("Unknown operation: " + streamOp);
@@ -186,11 +174,11 @@
                val next = createValue(concatenated, generateVersion(cache), params.get.flags)
                val replaced = cache.replace(k, prev, next);
                if (replaced)
-                  if (!params.get.noReply) sendReplaceResponse(header, ch, buffers, prev) else null
+                  if (!params.get.noReply) STORED else null
                else // If there's a concurrent modification on this key, treat it as we couldn't replace it
-                  if (!params.get.noReply) sendReplaceResponse(header, ch, buffers, null) else null
+                  if (!params.get.noReply) NOT_STORED else null
             } else {
-               if (!params.get.noReply) sendReplaceResponse(header, ch, buffers, null) else null
+               if (!params.get.noReply) NOT_STORED else null
             }
          }
          case IncrementRequest | DecrementRequest => {
@@ -211,14 +199,14 @@
                var replaced = cache.replace(k, prev, next)
                if (replaced) {
                   if (isStatsEnabled) if (header.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
-                  if (!params.get.noReply) ch.write(buffers.wrappedBuffer((new String(next.data) + CRLF).getBytes))
+                  if (!params.get.noReply) new String(next.data) + CRLF else null
                } else {
                   // If there's a concurrent modification on this key, the spec does not say what to do, so treat it as exceptional
                   throw new CacheException("Value modified since we retrieved from the cache, old value was " + prevCounter)
                }
             } else {
                if (isStatsEnabled) if (header.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
-               if (!params.get.noReply) ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+               if (!params.get.noReply) NOT_FOUND else null
             }
          }
          case FlushAllRequest => {
@@ -229,189 +217,139 @@
                flushFunction(cache.getAdvancedCache)
             else
                scheduler.schedule(new DelayedFlushAll(cache, flushFunction), flushDelay, TimeUnit.SECONDS)
-            if (params == None || !params.get.noReply) ch.write(buffers.wrappedBuffer("OK\r\n".getBytes))
+            if (params == None || !params.get.noReply) OK else null
          }
-         case VersionRequest => {
-            val sb = new StringBuilder
-            sb.append("VERSION ").append(Version.version).append(CRLF)
-            ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
-         }
+         case VersionRequest => new StringBuilder().append("VERSION ").append(Version.version).append(CRLF)
       }
-      null
    }
 
-   override def sendPutResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers): AnyRef = {
-      ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
-      null
-   }
+//   override def createPutResponse(header: RequestHeader): AnyRef = STORED
 
-   override def sendGetResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-                                k: String, v: MemcachedValue): AnyRef = {
-      if (v != null)
-         ch.write(buildGetResponse(header.op, buffers, k, v))
-      ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
-      null
+   override def createSuccessResponse(header: RequestHeader): AnyRef = {
+      if (isStatsEnabled) {
+         header.op match {
+            case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedHits.incrementAndGet
+            case _ => // No-op
+         }
+      }
+      header.op match {
+         case RemoveRequest => DELETED
+         case _ => STORED
+      }
    }
 
-   override def sendPutIfAbsentResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-                                        prev: MemcachedValue): AnyRef = {
-      if (prev == null)
-         ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
-      else
-         ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
-      null
+   override def createNotExecutedResponse(header: RequestHeader): AnyRef = {
+      if (isStatsEnabled) {
+         header.op match {
+            case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedBadval.incrementAndGet
+            case _ => // No-op
+         }
+      }
+      header.op match {
+         case ReplaceIfUnmodifiedRequest => EXISTS
+         case _ => NOT_STORED
+      }
    }
 
-   override def sendReplaceResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-                                    prev: MemcachedValue): AnyRef = {
-      if (prev == null)
-         ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
-      else
-         ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
-      null
+   override def createNotExistResponse(header: SuitableHeader): AnyRef = {
+      if (isStatsEnabled) {
+         header.op match {
+            case ReplaceIfUnmodifiedRequest => replaceIfUnmodifiedMisses.incrementAndGet
+            case _ => // No-op
+         }
+      }      
+      NOT_FOUND
    }
 
-   override def sendReplaceIfUnmodifiedResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-                                                v: Option[MemcachedValue], prev: Option[MemcachedValue]): AnyRef = {
-      if (v != None && prev != None) {
-         if (isStatsEnabled) replaceIfUnmodifiedHits.incrementAndGet
-         ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
-      } else if (v == None && prev != None) {
-         if (isStatsEnabled) replaceIfUnmodifiedBadval.incrementAndGet
-         ch.write(buffers.wrappedBuffer("EXISTS\r\n".getBytes))
-      } else {
-         if (isStatsEnabled) replaceIfUnmodifiedMisses.incrementAndGet
-         ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
-      }
-      null
-   }
-
-   override def sendRemoveResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-                                   prev: MemcachedValue): AnyRef = {
-      if (prev == null)
-         ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+   override def createGetResponse(header: RequestHeader, buffers: ChannelBuffers,
+                                  k: String, v: MemcachedValue): AnyRef = {
+      if (v != null)
+         List(buildGetResponse(header.op, buffers, k, v), buffers.wrappedBuffer(END))
       else
-         ch.write(buffers.wrappedBuffer("DELETED\r\n".getBytes))
-      null
+         END
    }
 
-   override def sendMultiGetResponse(header: RequestHeader, ctx: ChannelHandlerContext,
-                                     pairs: Map[String, MemcachedValue]): AnyRef = {
-      val buffers = ctx.getChannelBuffers
-      val ch = ctx.getChannel
+   override def createMultiGetResponse(header: RequestHeader, buffers: ChannelBuffers,
+                                       pairs: Map[String, MemcachedValue]): AnyRef = {
+      val elements = new ListBuffer[ChannelBuffer]
       header.op match {
          case GetRequest | GetWithVersionRequest => {
             for ((k, v) <- pairs)
-               ch.write(buildGetResponse(header.op, buffers, k, v))
-            ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+               elements += buildGetResponse(header.op, buffers, k, v)
+            elements += buffers.wrappedBuffer("END\r\n".getBytes)
          }
       }
-      null
+      elements.toList
    }
-   
-//   override def sendCustomResponse(header: RequestHeader, ch: Channel, buffers: ChannelBuffers,
-//                                   v: Option[MemcachedValue], prev: Option[MemcachedValue]): AnyRef = {
-//      header.op match {
-//         case AppendRequest | PrependRequest => {
-//            sendReplaceResponse(header, ch, buffers, prev.get)
-//         }
-//         case IncrementRequest | DecrementRequest => {
-//            if (prev.get == null) {
-//               if (isStatsEnabled) if (header.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
-//               ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
-//            } else {
-//               if (isStatsEnabled) if (header.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
-//               ch.write(buffers.wrappedBuffer((new String(v.get.data) + CRLF).getBytes))
-//            }
-//         }
-//         case FlushAllRequest => {
-//            ch.write(buffers.wrappedBuffer("OK\r\n".getBytes))
-//         }
-//         case VersionRequest => {
-//            val sb = new StringBuilder
-//            sb.append("VERSION ").append(Version.version).append(CRLF)
-//            ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
-//         }
-//      }
-//      null
-//   }
 
-   override def sendResponse(ctx: ChannelHandlerContext, t: Throwable): AnyRef = {
-      val ch = ctx.getChannel
-      val buffers = ctx.getChannelBuffers
+   override def createErrorResponse(t: Throwable): AnyRef = {
       val sb = new StringBuilder
       t match {
          case se: ServerException => {
             se.getCause match {
-               case uoe: UnknownOperationException => ch.write(buffers.wrappedBuffer("ERROR\r\n".getBytes))
-               case cce: ClosedChannelException => // no-op, only log
+               case uoe: UnknownOperationException => ERROR
+               case cce: ClosedChannelException => null// no-op, only log
                case _ => {
                   t match {
                      case ioe: IOException => sb.append("CLIENT_ERROR ")
                      case _ => sb.append("SERVER_ERROR ")
                   }
                   sb.append(t).append(CRLF)
-                  ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
                }
             }
          }
-         case _ => {
-            sb.append("SERVER_ERROR ").append(t).append(CRLF)
-            ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
-         }
+         case _ => sb.append("SERVER_ERROR ").append(t).append(CRLF)
       }
-      null
    }
 
-   def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, stats: Stats): AnyRef = {
-      var buffers = ctx.getChannelBuffers
-      var ch = ctx.getChannel
+   def createStatsResponse(header: RequestHeader, buffers: ChannelBuffers, stats: Stats): AnyRef = {
       var sb = new StringBuilder
-
-      writeStat("pid", 0, sb, buffers, ch) // Unsupported
-      writeStat("uptime", stats.getTimeSinceStart, sb, buffers, ch)
-      writeStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb, buffers, ch)
-      writeStat("version", cache.getVersion, sb, buffers, ch)
-      writeStat("pointer_size", 0, sb, buffers, ch) // Unsupported
-      writeStat("rusage_user", 0, sb, buffers, ch) // Unsupported
-      writeStat("rusage_system", 0, sb, buffers, ch) // Unsupported
-      writeStat("curr_items", stats.getCurrentNumberOfEntries, sb, buffers, ch)
-      writeStat("total_items", stats.getTotalNumberOfEntries, sb, buffers, ch)
-      writeStat("bytes", 0, sb, buffers, ch) // Unsupported
-      writeStat("curr_connections", 0, sb, buffers, ch) // TODO: Through netty?
-      writeStat("total_connections", 0, sb, buffers, ch) // TODO: Through netty?
-      writeStat("connection_structures", 0, sb, buffers, ch) // Unsupported
-      writeStat("cmd_get", stats.getRetrievals, sb, buffers, ch)
-      writeStat("cmd_set", stats.getStores, sb, buffers, ch)
-      writeStat("get_hits", stats.getHits, sb, buffers, ch)
-      writeStat("get_misses", stats.getMisses, sb, buffers, ch)
-      writeStat("delete_misses", stats.getRemoveMisses, sb, buffers, ch)
-      writeStat("delete_hits", stats.getRemoveHits, sb, buffers, ch)
-      writeStat("incr_misses", incrMisses, sb, buffers, ch)
-      writeStat("incr_hits", incrHits, sb, buffers, ch)
-      writeStat("decr_misses", decrMisses, sb, buffers, ch)
-      writeStat("decr_hits", decrHits, sb, buffers, ch)
-      writeStat("cas_misses", replaceIfUnmodifiedMisses, sb, buffers, ch)
-      writeStat("cas_hits", replaceIfUnmodifiedHits, sb, buffers, ch)
-      writeStat("cas_badval", replaceIfUnmodifiedBadval, sb, buffers, ch)
-      writeStat("auth_cmds", 0, sb, buffers, ch) // Unsupported
-      writeStat("auth_errors", 0, sb, buffers, ch) // Unsupported
-      //TODO: Evictions are measure by evict calls, but not by nodes are that are expired after the entry's lifespan has expired.
-      writeStat("evictions", stats.getEvictions, sb, buffers, ch)
-      writeStat("bytes_read", 0, sb, buffers, ch) // TODO: Through netty?
-      writeStat("bytes_written", 0, sb, buffers, ch) // TODO: Through netty?
-      writeStat("limit_maxbytes", 0, sb, buffers, ch) // Unsupported
-      writeStat("threads", 0, sb, buffers, ch) // TODO: Through netty?
-      writeStat("conn_yields", 0, sb, buffers, ch) // Unsupported
-
-      ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
-      null
+      List[ChannelBuffer] (
+         buildStat("pid", 0, sb, buffers),
+         buildStat("uptime", stats.getTimeSinceStart, sb, buffers),
+         buildStat("uptime", stats.getTimeSinceStart, sb, buffers),
+         buildStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb, buffers),
+         buildStat("version", cache.getVersion, sb, buffers),
+         buildStat("pointer_size", 0, sb, buffers), // Unsupported
+         buildStat("rusage_user", 0, sb, buffers), // Unsupported
+         buildStat("rusage_system", 0, sb, buffers), // Unsupported
+         buildStat("curr_items", stats.getCurrentNumberOfEntries, sb, buffers),
+         buildStat("total_items", stats.getTotalNumberOfEntries, sb, buffers),
+         buildStat("bytes", 0, sb, buffers), // Unsupported
+         buildStat("curr_connections", 0, sb, buffers), // TODO: Through netty?
+         buildStat("total_connections", 0, sb, buffers), // TODO: Through netty?
+         buildStat("connection_structures", 0, sb, buffers), // Unsupported
+         buildStat("cmd_get", stats.getRetrievals, sb, buffers),
+         buildStat("cmd_set", stats.getStores, sb, buffers),
+         buildStat("get_hits", stats.getHits, sb, buffers),
+         buildStat("get_misses", stats.getMisses, sb, buffers),
+         buildStat("delete_misses", stats.getRemoveMisses, sb, buffers),
+         buildStat("delete_hits", stats.getRemoveHits, sb, buffers),
+         buildStat("incr_misses", incrMisses, sb, buffers),
+         buildStat("incr_hits", incrHits, sb, buffers),
+         buildStat("decr_misses", decrMisses, sb, buffers),
+         buildStat("decr_hits", decrHits, sb, buffers),
+         buildStat("cas_misses", replaceIfUnmodifiedMisses, sb, buffers),
+         buildStat("cas_hits", replaceIfUnmodifiedHits, sb, buffers),
+         buildStat("cas_badval", replaceIfUnmodifiedBadval, sb, buffers),
+         buildStat("auth_cmds", 0, sb, buffers), // Unsupported
+         buildStat("auth_errors", 0, sb, buffers), // Unsupported
+         //TODO: Evictions are measure by evict calls, but not by nodes are that are expired after the entry's lifespan has expired.
+         buildStat("evictions", stats.getEvictions, sb, buffers),
+         buildStat("bytes_read", 0, sb, buffers), // TODO: Through netty?
+         buildStat("bytes_written", 0, sb, buffers), // TODO: Through netty?
+         buildStat("limit_maxbytes", 0, sb, buffers), // Unsupported
+         buildStat("threads", 0, sb, buffers), // TODO: Through netty?
+         buildStat("conn_yields", 0, sb, buffers), // Unsupported
+         buffers.wrappedBuffer(END)
+      )
    }
 
-   private def writeStat(stat: String, value: Any, sb: StringBuilder, buffers: ChannelBuffers, ch: Channel) {
+   private def buildStat(stat: String, value: Any, sb: StringBuilder, buffers: ChannelBuffers): ChannelBuffer = {
       sb.append("STAT").append(' ').append(stat).append(' ').append(value).append(CRLF)
-      ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+      val buffer = buffers.wrappedBuffer(sb.toString.getBytes)
       sb.setLength(0)
+      buffer
    }
 
    override def start {
@@ -430,7 +368,7 @@
    private def buildGetResponse(op: Enumeration#Value, buffers: ChannelBuffers,
                                 k: String, v: MemcachedValue): ChannelBuffer = {
       val header = buildGetResponseHeader(k, v, op)
-      buffers.wrappedBuffer(header.getBytes, v.data, CRLF.getBytes)
+      buffers.wrappedBuffer(header.getBytes, v.data, CRLFBytes)
    }
 
    private def buildGetResponseHeader(k: String, v: MemcachedValue, op: Enumeration#Value): String = {
@@ -444,31 +382,38 @@
 
 }
 
-object MemcachedDecoder extends Logging
-
 class MemcachedParameters(override val data: Array[Byte], override val lifespan: Int,
                           override val maxIdle: Int, override val streamVersion: Long,
                           override val noReply: Boolean, val flags: Int, val delta: String,
                           val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, streamVersion, noReply)
 
-//object MemcachedOperation extends Enumeration(10) {
-//   val PutRequest = new Value("set")
-//   val PutIfAbsentRequest = new Value("add")
-//   val ReplaceRequest = new Value("replace")
-//   val ReplaceIfUnmodifiedRequest = new Value("cas")
-//   val AppendRequest = new Value("append")
-//   val PrependRequest = new Value("prepend")
-//   val GetRequest = new Value("get")
-//   val GetWithVersionRequest = new Value("gets")
-//   val DeleteRequest = new Value("delete")
-//   val IncrementRequest = new Value("incr")
-//   val DecrementRequest = new Value("decr")
-//   val FlushAllRequest = new Value("flush_all")
-//   val VersionRequest = new Value("version")
-//   val StatsRequest = new Value("stats")
-//}
-
 private class DelayedFlushAll(cache: Cache[String, MemcachedValue],
                               flushFunction: AdvancedCache[String, MemcachedValue] => Unit) extends Runnable {
    override def run() = flushFunction(cache.getAdvancedCache)
-}
\ No newline at end of file
+}
+
+private object RequestResolver extends Logging {
+   private val operations = Map[String, Enumeration#Value](
+      "set" -> PutRequest,
+      "add" -> PutIfAbsentRequest,
+      "replace" -> ReplaceRequest,
+      "cas" -> ReplaceIfUnmodifiedRequest,
+      "append" -> AppendRequest,
+      "prepend" -> PrependRequest,
+      "get" -> GetRequest,
+      "gets" -> GetWithVersionRequest,
+      "delete" -> RemoveRequest,
+      "incr" -> IncrementRequest,
+      "decr" -> DecrementRequest,
+      "flush_all" -> FlushAllRequest,
+      "version" -> VersionRequest,
+      "stats" -> StatsRequest
+   )
+
+   def toRequest(commandName: String): Option[Enumeration#Value] = {
+      trace("Operation: {0}", commandName)
+      val op = operations.get(commandName)
+      op
+   }
+}
+

Deleted: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -1,39 +0,0 @@
-package org.infinispan.server.memcached
-
-import org.infinispan.server.core.Operation._
-import org.infinispan.server.memcached.MemcachedOperation._
-import org.infinispan.server.core.Logging
-
-/**
- * // TODO: Document this
- * @author Galder Zamarreño
- * @since
- */
-// todo: maybe try to abstract this into something that can be shared betwen hr and memcached
-// todo(2): Do I need this at all? Simply move it to decoder!
-object OperationResolver extends Logging {
-   // TODO: Rather than holding a map, check if the String could be passed as part of the Enumeration and whether this could be retrieved in a O(1) op
-   private val operations = Map[String, Enumeration#Value](
-      "set" -> PutRequest,
-      "add" -> PutIfAbsentRequest,
-      "replace" -> ReplaceRequest,
-      "cas" -> ReplaceIfUnmodifiedRequest,
-      "append" -> AppendRequest,
-      "prepend" -> PrependRequest,
-      "get" -> GetRequest,
-      "gets" -> GetWithVersionRequest,
-      "delete" -> RemoveRequest,
-      "incr" -> IncrementRequest,
-      "decr" -> DecrementRequest,
-      "flush_all" -> FlushAllRequest,
-      "version" -> VersionRequest,
-      "stats" -> StatsRequest
-   )
-
-   def resolve(commandName: String): Option[Enumeration#Value] = {
-      trace("Operation: {0}", commandName)
-      val op = operations.get(commandName)
-      op
-   }
-}
-

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -11,6 +11,16 @@
 trait TextProtocolUtil {
 
    final val CRLF = "\r\n"
+   final val CRLFBytes = "\r\n".getBytes
+   final val END = "END\r\n".getBytes
+   final val DELETED = "DELETED\r\n".getBytes
+   final val NOT_FOUND = "NOT_FOUND\r\n".getBytes
+   final val EXISTS = "EXISTS\r\n".getBytes
+   final val STORED = "STORED\r\n".getBytes
+   final val NOT_STORED = "NOT_STORED\r\n".getBytes
+   final val OK = "OK\r\n".getBytes
+   final val ERROR = "ERROR\r\n".getBytes
+
    final val CR = 13
    final val LF = 10
 

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-03-25 13:22:15 UTC (rev 1621)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-03-25 17:38:02 UTC (rev 1622)
@@ -4,12 +4,10 @@
 import net.spy.memcached.{DefaultConnectionFactory, MemcachedClient}
 import java.util.Arrays
 import java.net.InetSocketAddress
-import org.infinispan.Cache
 import java.util.concurrent.atomic.AtomicInteger
 import org.infinispan.manager.CacheManager
 import org.infinispan.server.core.transport.Decoder
 import org.infinispan.server.memcached.{MemcachedDecoder, MemcachedValue, MemcachedServer}
-import org.infinispan.server.core.RequestHeader
 
 /**
  * // TODO: Document this
@@ -20,18 +18,6 @@
 trait MemcachedTestingUtil {
    def host = "127.0.0.1"
 
-//   def k(m: Method, prefix: String): Array[Byte] = {
-//      val bytes: Array[Byte] = (prefix + m.getName).getBytes
-//      trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
-//      bytes
-//   }
-//
-//   def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
-//
-//   def k(m: Method): Array[Byte] = k(m, "k-")
-//
-//   def v(m: Method): Array[Byte] = v(m, "v-")
-
    def k(m: Method, prefix: String): String = prefix + m.getName
 
    def v(m: Method, prefix: String): String = prefix + m.getName
@@ -58,15 +44,6 @@
 
    def startMemcachedTextServer(cacheManager: CacheManager, cacheName: String): MemcachedServer = {
       startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue, cacheName)
-//      val server = new MemcachedServer {
-//         protected override def getDecoder(cacheManager: CacheManager): Decoder = {
-//            new MemcachedDecoder(cacheManager) {
-//               override def getCache(header: RequestHeader) = cacheManager.getCache[String, MemcachedValue](cacheName)
-//            }
-//         }
-//      }
-//      server.start(host, UniquePortThreadLocal.get.intValue, cacheManager, 0, 0)
-//      server
    }
 
    def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {



More information about the infinispan-commits mailing list