[infinispan-commits] Infinispan SVN: r1613 - in trunk/server: core/src/main and 27 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Mar 23 13:59:59 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-03-23 13:59:55 -0400 (Tue, 23 Mar 2010)
New Revision: 1613

Added:
   trunk/server/core/src/main/scala/
   trunk/server/core/src/main/scala/org/
   trunk/server/core/src/main/scala/org/infinispan/
   trunk/server/core/src/main/scala/org/infinispan/server/
   trunk/server/core/src/main/scala/org/infinispan/server/core/
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/core/src/test/scala/
   trunk/server/core/src/test/scala/org/
   trunk/server/core/src/test/scala/org/infinispan/
   trunk/server/core/src/test/scala/org/infinispan/server/
   trunk/server/core/src/test/scala/org/infinispan/server/core/
   trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
   trunk/server/memcached/src/main/scala/
   trunk/server/memcached/src/main/scala/org/
   trunk/server/memcached/src/main/scala/org/infinispan/
   trunk/server/memcached/src/main/scala/org/infinispan/server/
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
   trunk/server/memcached/src/test/scala/
   trunk/server/memcached/src/test/scala/org/
   trunk/server/memcached/src/test/scala/org/infinispan/
   trunk/server/memcached/src/test/scala/org/infinispan/server/
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Removed:
   trunk/server/core/src/main/java/
   trunk/server/core/src/test/java/
   trunk/server/memcached/src/main/java/
   trunk/server/memcached/src/test/java/
Modified:
   trunk/server/core/pom.xml
   trunk/server/memcached/pom.xml
Log:
Refactored memcached and core module so that core does more work and memcached only does protocol specific work. Also converted both modules to scala. 


Modified: trunk/server/core/pom.xml
===================================================================
--- trunk/server/core/pom.xml	2010-03-22 20:12:52 UTC (rev 1612)
+++ trunk/server/core/pom.xml	2010-03-23 17:59:55 UTC (rev 1613)
@@ -18,6 +18,7 @@
       <version.netty>3.2.0.BETA1</version.netty>      
       <version.gnu.getopt>1.0.13</version.gnu.getopt>
       <version.apache.log4j>1.2.15</version.apache.log4j>
+      <version.scala>2.8.0.Beta1</version.scala>
    </properties>
 
    <dependencies>
@@ -39,6 +40,49 @@
          <version>${version.apache.log4j}</version>
          <scope>test</scope>
       </dependency>
+
+      <dependency>
+         <groupId>org.scala-lang</groupId>
+         <artifactId>scala-library</artifactId>
+         <version>${version.scala}</version>
+      </dependency>
+      
    </dependencies>
 
+   <build>
+      <finalName>infinispan</finalName>
+      <sourceDirectory>src/main/scala</sourceDirectory>
+      <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+      <plugins>
+         <plugin>
+            <groupId>org.scala-tools</groupId>
+            <artifactId>maven-scala-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>compile</id>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+                  <phase>compile</phase>
+               </execution>
+               <execution>
+                  <id>test-compile</id>
+                  <goals>
+                     <goal>testCompile</goal>
+                  </goals>
+                  <phase>test-compile</phase>
+               </execution>
+               <execution>
+                  <phase>process-resources</phase>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+      </plugins>
+
+   </build>
+   
 </project>

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,179 @@
+package org.infinispan.server.core
+
+import org.infinispan.Cache
+import Operation._
+import transport.{ExceptionEvent, Decoder, ChannelHandlerContext, ChannelBuffer}
+import scala.collection.mutable.HashMap
+import scala.collection.immutable
+import org.infinispan.remoting.transport.Address
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConversions._
+import java.util.concurrent.TimeUnit
+import org.infinispan.stats.Stats
+import org.infinispan.server.core.VersionGenerator._
+import java.io.StreamCorruptedException
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class AbstractProtocolDecoder[K, V <: Value] extends Decoder {
+   import AbstractProtocolDecoder._
+
+   type SuitableParameters <: RequestParameters
+
+   private val versionCounter = new AtomicInteger
+
+   override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef = {
+      if (buffer.readableBytes < 1) return null
+      val header = readHeader(buffer)
+      val cache = getCache(header)
+      header.op match {
+         case PutRequest | PutIfAbsentRequest | ReplaceRequest | ReplaceIfUnmodifiedRequest | DeleteRequest => {
+            val k = readKey(buffer)
+            val params = readParameters(header.op, buffer)
+            header.op match {
+               case PutRequest => {
+                  putInCache(k, params, cache)
+                  sendResponse(header, ctx, None, None, Some(params), None)
+               }
+               case PutIfAbsentRequest => {
+                  val prev = cache.get(k)
+                  if (prev == null) putIfAbsentInCache(k, params, cache) // Generate new version only if key not present
+                  sendResponse(header, ctx, None, None, Some(params), Some(prev))
+               }
+               case ReplaceRequest => {
+                  val prev = cache.get(k)
+                  if (prev != null) replaceInCache(k, params, cache) // Generate new version only if key present
+                  sendResponse(header, ctx, None, None, Some(params), Some(prev))
+               }
+               case ReplaceIfUnmodifiedRequest => {
+                  val prev = cache.get(k)
+                  if (prev != null) {
+                     if (prev.version == params.version) {
+                        // Generate new version only if key present and version has not changed, otherwise it's wasteful
+                        val v = createValue(params, generateVersion(cache))
+                        val replaced = cache.replace(k, prev, v);
+                        if (replaced)
+                           sendResponse(header, ctx, None, Some(v), Some(params), Some(prev))
+                        else
+                           sendResponse(header, ctx, None, None, Some(params), Some(prev))
+                     } else {
+                        sendResponse(header, ctx, None, None, Some(params), Some(prev))
+                     }
+                  } else {
+                     sendResponse(header, ctx, None, None, Some(params), None)
+                  }
+               }
+               case DeleteRequest => {
+                  val prev = cache.remove(k)
+                  sendResponse(header, ctx, None, None, Some(params), Some(prev))
+               }
+            }
+         }
+         case GetRequest | GetWithVersionRequest => {
+            val keys = readKeys(buffer)
+            if (keys.length > 1) {
+               val map = new HashMap[K,V]()
+               for (k <- keys) {
+                  val v = cache.get(k)
+                  if (v != null)
+                     map += (k -> v)
+               }
+               sendResponse(header, ctx, new immutable.HashMap ++ map)
+            } else {
+               sendResponse(header, ctx, Some(keys.head), Some(cache.get(keys.head)), None, None)
+            }
+         }
+         case StatsRequest => sendResponse(ctx, cache.getAdvancedCache.getStats)
+         case _ => handleCustomRequest(header, ctx, buffer, cache)
+      }
+      null
+   }
+
+   private def putInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+      val v = createValue(params, generateVersion(cache))
+      cache.put(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+   }
+
+   private def putIfAbsentInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+      val v = createValue(params, generateVersion(cache))
+      cache.putIfAbsent(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+   }
+
+   private def replaceInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+      val v = createValue(params, generateVersion(cache))
+      cache.replace(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+   }
+
+   override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+      error("Exception reported", e.getCause)
+      sendResponse(ctx, e.getCause)
+   }
+
+   def readHeader(buffer: ChannelBuffer): RequestHeader
+
+   def getCache(header: RequestHeader): Cache[K, V]
+
+   // todo: probably remove in favour of readKeys
+   def readKey(buffer: ChannelBuffer): K
+
+   def readKeys(buffer: ChannelBuffer): Array[K]
+
+   def readParameters(op: Enumeration#Value, buffer: ChannelBuffer): SuitableParameters
+
+   def createValue(params: SuitableParameters, nextVersion: Long): V 
+
+   def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, k: Option[K], v: Option[V], params: Option[SuitableParameters], prev: Option[V])
+
+   def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, pairs: Map[K, V])
+
+   def sendResponse(ctx: ChannelHandlerContext, t: Throwable)
+
+   def sendResponse(ctx: ChannelHandlerContext, stats: Stats)
+
+   def handleCustomRequest(header: RequestHeader, ctx: ChannelHandlerContext, buffer: ChannelBuffer, cache: Cache[K, V])
+
+   /**
+    * Transforms lifespan pass as seconds into milliseconds
+    * following this rule:
+    *
+    * If lifespan is bigger than number of seconds in 30 days,
+    * then it is considered unix time. After converting it to
+    * milliseconds, we substract the current time in and the
+    * result is returned.
+    *
+    * Otherwise it's just considered number of seconds from
+    * now and it's returned in milliseconds unit.
+    */
+   private def toMillis(lifespan: Int): Long = {
+      if (lifespan > SecondsInAMonth) TimeUnit.SECONDS.toMillis(lifespan) - System.currentTimeMillis
+      else TimeUnit.SECONDS.toMillis(lifespan)
+   }
+
+   protected def generateVersion(cache: Cache[K, V]): Long = {
+      val rpcManager = cache.getAdvancedCache.getRpcManager
+      if (rpcManager != null) {
+         val transport = rpcManager.getTransport
+         newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
+      } else {
+         newVersion(None, None, 0)
+      }
+   }
+
+}
+
+object AbstractProtocolDecoder extends Logging {
+   private val SecondsInAMonth = 60 * 60 * 24 * 30
+   private val DefaultTimeUnit = TimeUnit.MILLISECONDS 
+}
+
+// todo: once I implement the hotrod see, revisit to see whether this class is still necessary,
+// todo: ...I suspect so cos I'd need a place to hold stuff that appears before the operation
+class RequestHeader(val op: Enumeration#Value)
+
+class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val version: Long)
+
+class UnknownOperationException(reason: String) extends StreamCorruptedException(reason)
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,55 @@
+package org.infinispan.server.core
+
+import java.net.InetSocketAddress
+import transport.netty.{EncoderAdapter, DecoderAdapter, NettyTransport}
+import transport.{Decoder, Encoder, Transport}
+import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class AbstractProtocolServer extends ProtocolServer {
+   private var host: String = _
+   private var port: Int = _
+   private var masterThreads: Int = _
+   private var workerThreads: Int = _
+   private var transport: Transport = _
+   private var cacheManager: CacheManager = _
+   private var decoder: Decoder = _
+   private var encoder: Encoder = _
+
+   override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int) {
+      this.host = host
+      this.port = port
+      this.masterThreads = masterThreads
+      this.workerThreads = workerThreads
+
+      decoder = getDecoder(cacheManager)
+      decoder.start
+      encoder = getEncoder
+      val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
+      val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
+      val address =  new InetSocketAddress(host, port)
+      // TODO change cache name 'default' to something more meaningful and dependent of protocol
+      transport = new NettyTransport(nettyDecoder, nettyEncoder, address, masterThreads, workerThreads, "default")
+      transport.start
+   }
+
+   override def stop {
+      if (transport != null)
+         transport.stop
+      if (decoder != null)
+         decoder.stop
+//      cacheManager.stop
+   }
+
+   def getPort = port
+
+   protected def getEncoder: Encoder
+
+   protected def getDecoder(cacheManager: CacheManager): Decoder
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,43 @@
+package org.infinispan.server.core
+
+import org.infinispan.util.logging.{LogFactory, Log}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+trait Logging {
+   private lazy val log: Log = LogFactory.getLog(getClass)
+
+//   def info(msg: => String) = if (log.isInfoEnabled) log.info(msg, null)
+
+   // params.map(_.asInstanceOf[AnyRef]) => returns a Seq[AnyRef]
+   // the ': _*' part tells the compiler to pass it as varargs
+   def info(msg: => String, params: Any*) = if (log.isInfoEnabled) log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+//   def debug(msg: => String) = log.debug(msg, null)
+
+   def debug(msg: => String, params: Any*) = if (log.isDebugEnabled) log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+//   def trace(msg: => String) = log.trace(msg, null)
+
+   def trace(msg: => String, params: Any*) = if (log.isTraceEnabled) log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+   def warn(msg: => String, params: Any*) = log.warn(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+   def warn(msg: => String, t: Throwable) = log.warn(msg, t, null)
+
+   def error(msg: => String) = log.error(msg, null)
+
+   def error(msg: => String, t: Throwable) = log.error(msg, t, null)
+
+   // TODO: Sort out the other error methods that take both Throwable and varargs
+
+//   def error(msg: => String, params: Any*) =
+//      if (log.isErrorEnabled) log.error(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+//
+//   def error(msg: => String, t: Throwable, params: Any*) =
+//      if (log.isErrorEnabled) log.error(msg, t, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,214 @@
+package org.infinispan.server.core
+
+import scala.collection.JavaConversions._
+import collection.mutable.HashMap
+import scala.collection.mutable
+import org.infinispan.util.Util
+import java.io.IOException
+import java.security.{PrivilegedAction, AccessController}
+import java.util.concurrent.{ThreadFactory, ExecutionException, Callable, Executors}
+import gnu.getopt.{Getopt, LongOpt}
+import org.infinispan.Version
+import org.infinispan.manager.{CacheManager, DefaultCacheManager}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object Main extends Logging {
+   
+   val PROP_KEY_PORT = "infinispan.server.port"
+   val PROP_KEY_HOST = "infinispan.server.host"
+   val PROP_KEY_MASTER_THREADS = "infinispan.server.master.threads"
+   val PROP_KEY_WORKER_THREADS = "infinispan.server.worker.threads"
+   val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache.config"
+   val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
+   val PORT_DEFAULT = 11211
+   val HOST_DEFAULT = "127.0.0.1"
+   val MASTER_THREADS_DEFAULT = 0
+   val WORKER_THREADS_DEFAULT = 0
+
+   /**
+    * Server properties.  This object holds all of the required
+    * information to get the server up and running. Use System
+    * properties for defaults.
+    */
+   private val props: mutable.Map[String, String] = {
+      // Set default properties
+      val properties = new HashMap[String, String]()
+      val sysProps = System.getProperties
+      for (property <- asIterator(sysProps.iterator))
+         properties.put(property._1, property._2)
+      properties
+   }
+   
+   private var programName: String = _
+   private var server: ProtocolServer = _
+
+   def main(args: Array[String]) {
+       info("Start main with args: {0}", args.mkString(", "))
+       var worker = new Callable[Void] {
+          override def call = {
+             try {
+                boot(args)
+             }
+             catch {
+                case e: Exception => {
+                   System.err.println("Failed to boot JBoss:")
+                   e.printStackTrace
+                   throw e
+                }
+             }
+             null
+          }
+       }
+       var f = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
+          override def newThread(r: Runnable): Thread = {
+             // TODO Maybe create thread names based on the protocol run
+             return new Thread(r, "InfinispanServer-Main")
+          }
+       }).submit(worker)
+       f.get
+    }
+
+    def boot(args: Array[String]) {
+      // First process the command line to pickup custom props/settings
+      processCommandLine(args)
+
+      val host = if (props.get(PROP_KEY_HOST) == None) HOST_DEFAULT else props.get(PROP_KEY_HOST).get
+      val masterThreads = if (props.get(PROP_KEY_MASTER_THREADS) == None) MASTER_THREADS_DEFAULT else props.get(PROP_KEY_MASTER_THREADS).get.toInt
+      if (masterThreads < 0) {
+         throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads)
+      }
+      val workerThreads = if (props.get(PROP_KEY_WORKER_THREADS) == None) WORKER_THREADS_DEFAULT else props.get(PROP_KEY_WORKER_THREADS).get.toInt
+      if (workerThreads < 0) {
+         throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads)
+      }
+      val configFile = props.get(PROP_KEY_CACHE_CONFIG)
+      var protocol = props.get(PROP_KEY_PROTOCOL)
+      if (protocol == None) {
+         System.err.println("ERROR: Please indicate protocol to run with -r parameter")
+         showAndExit
+      }
+
+      // TODO: move class name and protocol number to a resource file under the corresponding project
+      val clazz = protocol.get match {
+         case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
+         case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
+      }
+      val port = {
+         if (props.get(PROP_KEY_PORT) == None) {
+            protocol.get match {
+               case "memcached" => 11211
+               case "hotrod" => 11311
+            }
+         } else {
+            props.get(PROP_KEY_PORT).get.toInt
+         }
+      }
+
+      var server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
+      val cacheManager = if (configFile == None) new DefaultCacheManager else new DefaultCacheManager(configFile.get)
+      addShutdownHook(new ShutdownHook(server, cacheManager))
+      server.start(host, port, cacheManager, masterThreads, workerThreads)
+   }
+
+   private def processCommandLine(args: Array[String]) {
+      programName = System.getProperty("program.name", "startServer")
+      var sopts = "-:hD:Vp:l:m:t:c:r:"
+      var lopts = Array(
+         new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
+         new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
+         new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
+         new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
+         new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
+         new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
+         new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
+         new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'))
+      var getopt = new Getopt(programName, args, sopts, lopts)
+      var code: Int = 0
+      while ((({code = getopt.getopt; code})) != -1) {
+         code match {
+            case ':' | '?' => System.exit(1)
+            case 1 => System.err.println(programName + ": unused non-option argument: " + getopt.getOptarg)
+            case 'h' => showAndExit
+            case 'V' => {
+               Version.printFullVersionInformation
+               System.exit(0)
+            }
+            case 'p' => props.put(PROP_KEY_PORT, getopt.getOptarg)
+            case 'l' => props.put(PROP_KEY_HOST, getopt.getOptarg)
+            case 'm' => props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
+            case 't' => props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
+            case 'c' => props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
+            case 'r' => props.put(PROP_KEY_PROTOCOL, getopt.getOptarg)
+            case 'D' => {
+               val arg = getopt.getOptarg
+               var name = ""
+               var value = ""
+               var i = arg.indexOf("=")
+               if (i == -1) {
+                  name = arg
+                  value = "true"
+               } else {
+                  name = arg.substring(0, i)
+                  value = arg.substring(i + 1, arg.length)
+               }
+               System.setProperty(name, value)
+            }
+            case _ => throw new Exception("unhandled option code: " + code)
+         }
+      }
+   }
+
+   private def addShutdownHook(shutdownHook: Thread): Unit = {
+      AccessController.doPrivileged(new PrivilegedAction[Void] {
+         override def run = {
+            Runtime.getRuntime.addShutdownHook(shutdownHook)
+            null
+         }
+      })
+   }
+
+   private def showAndExit {
+      println("usage: " + programName + " [options]")
+      println
+      println("options:")
+      println("    -h, --help                         Show this help message")
+      println("    -V, --version                      Show version information")
+      println("    --                                 Stop processing options")
+      println("    -p, --port=<num>                   TCP port number to listen on (default: 11211)")
+      println("    -l, --host=<host or ip>            Interface to listen on (default: 127.0.0.1, localhost)")
+      println("    -m, --master_threads=<num>         Number of threads accepting incoming connections (default: unlimited while resources are available)")
+      println("    -t, --work_threads=<num>           Number of threads processing incoming requests and sending responses (default: unlimited while resources are available)")
+      println("    -c, --cache_config=<filename>      Cache configuration file (default: creates cache with default values)")
+      println("    -r, --protocol=[memcached|hotrod]  Protocol to understand by the server. This is a mandatory option and you should choose one of the two options")
+      println("    -D<name>[=<value>]                 Set a system property")
+      println
+      System.exit(0)
+   }
+}
+
+private class ShutdownHook(server: ProtocolServer, cacheManager: CacheManager) extends Thread {
+   override def run {
+      if (server != null) {
+         System.out.println("Posting Shutdown Request to the server...")
+         var f = Executors.newSingleThreadExecutor.submit(new Callable[Void] {
+            override def call = {
+               server.stop
+               cacheManager.stop
+               null
+            }
+         })
+         try {
+            f.get
+         }
+         catch {
+            case ie: IOException => Thread.interrupted
+            case e: ExecutionException => throw new RuntimeException("Exception encountered in shutting down the server", e)
+         }
+      }
+   }
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object Operation extends Enumeration {
+   type Operation = Value
+   val PutRequest, PutIfAbsentRequest, ReplaceRequest, ReplaceIfUnmodifiedRequest = Value
+   val GetRequest, GetWithVersionRequest = Value
+   val DeleteRequest, StatsRequest = Value
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core
+
+import org.infinispan.manager.CacheManager
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+trait ProtocolServer {
+   def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int)
+   def stop 
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,32 @@
+package org.infinispan.server.core
+
+import org.infinispan.util.Util
+import java.io.{Serializable, ObjectOutput, ObjectInput, Externalizable}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// TODO: Make it a hardcoded Externalizer
+class Value(val v: Array[Byte], val version: Long) extends Serializable {
+
+   override def toString = {
+      new StringBuilder().append("Value").append("{")
+         .append("v=").append(Util.printArray(v, false))
+         .append(", version=").append(version)
+         .append("}").toString
+   }
+
+//   override def readExternal(in: ObjectInput) {
+//      v = new Array[Byte](in.read())
+//      in.read(v)
+//      version = in.readLong
+//   }
+//
+//   override def writeExternal(out: ObjectOutput) {
+//      out.write(v.length)
+//      out.write(v)
+//      out.writeLong(version)
+//   }
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,34 @@
+package org.infinispan.server.core
+
+import org.infinispan.remoting.transport.Address
+import org.infinispan.Cache
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object VersionGenerator {
+
+   private val versionCounter = new AtomicInteger
+
+   def newVersion(address: Option[Address], members: Option[Iterable[Address]], viewId: Long): Long = {
+      val counter = versionCounter.incrementAndGet
+      var version: Long = counter
+      if (address != None && members != None) {
+         // todo: perf tip: cache rank and viewId as a volatile and recalculate with each view change
+         val rank: Long = findAddressRank(address.get, members.get, 1)
+         version = (rank << 32) | version
+         version = (viewId << 48) | version
+      }
+      version
+   }
+
+   private def findAddressRank(address: Address, members: Iterable[Address], rank: Int): Int = {
+      if (address.equals(members.head)) rank
+      else findAddressRank(address, members.tail, rank + 1)
+   }
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,12 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Channel {
+   def write(message: Any): ChannelFuture
+   def disconnect: ChannelFuture
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,42 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelBuffer {
+   def readByte: Byte
+   def readBytes(dst: Array[Byte], dstIndex: Int, length: Int)
+   def readUnsignedByte: Float
+   def readUnsignedInt: Int
+   def readUnsignedLong: Long
+   def readBytes(length: Int): ChannelBuffer
+   def readerIndex: Int
+   def readBytes(dst: Array[Byte]): Unit
+   def readRangedBytes: Array[Byte]
+   def readableBytes: Int
+
+   /**
+    * Reads length of String and then returns an UTF-8 formatted String of such length.
+    */
+   def readString: String
+   def writeByte(value: Byte)
+   def writeBytes(src: Array[Byte])
+
+   /**
+    * Writes the length of the byte array and transfers the specified source array's data to this buffer
+   */
+   def writeRangedBytes(src: Array[Byte])
+   def writeUnsignedInt(i: Int)
+   def writeUnsignedLong(l: Long)
+   def writerIndex: Int
+
+   /**
+    * Writes the length of the String followed by the String itself. This methods expects String not to be null.
+    */
+   def writeString(msg: String)
+
+   def getUnderlyingChannelBuffer: AnyRef
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelBuffers {
+//   def wrappedBuffer(buffers: ChannelBuffer*): ChannelBuffer
+//   def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer
+   def wrappedBuffer(array: Array[Byte]*): ChannelBuffer
+   def dynamicBuffer(): ChannelBuffer
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,23 @@
+package org.infinispan.server.core.transport
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelFuture {
+   def getChannel: Channel
+   def isDone: Boolean
+   def isCancelled: Boolean
+   def setSuccess: Boolean
+   def setFailure(cause: Throwable): Boolean
+   def await: ChannelFuture
+   def awaitUninterruptibly: ChannelFuture
+   def await(timeout: Long, unit: TimeUnit): Boolean
+   def await(timeoutMillis: Long): Boolean
+   def awaitUninterruptibly(timeout: Long, unit: TimeUnit): Boolean
+   def awaitUninterruptibly(timeoutMillis: Long): Boolean
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,12 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelHandlerContext {
+   def getChannel: Channel
+   def getChannelBuffers: ChannelBuffers
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,19 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Decoder {
+
+   def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef
+   
+   def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
+
+   def start
+
+   def stop
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,11 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Encoder {
+   def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,11 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ExceptionEvent {
+   def getCause: Throwable
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ */
+public enum NoState {
+}

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,17 @@
+package org.infinispan.server.core.transport
+
+import java.net.SocketAddress
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Transport {
+
+   def start
+   
+   def stop
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,24 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{Channel => NettyChannel}
+import org.infinispan.server.core.transport.{ChannelBuffer, ChannelFuture, Channel}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelAdapter(val ch: NettyChannel) extends Channel {
+
+   override def disconnect: ChannelFuture = new ChannelFutureAdapter(ch.disconnect());
+
+   override def write(message: Any): ChannelFuture = {
+      val toWrite = message match {
+         case buffer: ChannelBuffer => buffer.getUnderlyingChannelBuffer
+         case _ => message
+      }
+      new ChannelFutureAdapter(ch.write(toWrite));
+   }
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,60 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.ChannelBuffer
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelBufferAdapter(val buffer: NettyChannelBuffer) extends ChannelBuffer {
+   
+   override def readByte: Byte = buffer.readByte
+   override def readBytes(dst: Array[Byte], dstIndex: Int, length: Int) = buffer.readBytes(dst, dstIndex, length)
+   override def readUnsignedByte: Float = buffer.readUnsignedByte
+   override def readUnsignedInt: Int = { // TODO
+      0
+      }
+   override def readUnsignedLong: Long = { // TODO
+      0
+      }
+   override def readBytes(length: Int): ChannelBuffer = new ChannelBufferAdapter(buffer.readBytes(length))
+   override def readerIndex: Int = readerIndex
+   override def readBytes(dst: Array[Byte]) = buffer.readBytes(dst) 
+   override def readRangedBytes: Array[Byte] = { // TODO
+      null
+      }
+   override def readableBytes = buffer.writerIndex - buffer.readerIndex
+
+   /**
+    * Reads length of String and then returns an UTF-8 formatted String of such length.
+    */
+   override def readString: String = { // TODO
+      null
+      }
+   override def writeByte(value: Byte) = buffer.writeByte(value)
+   override def writeBytes(src: Array[Byte]) = buffer.writeBytes(src)
+
+   /**
+    * Writes the length of the byte array and transfers the specified source array's data to this buffer
+   */
+   override def writeRangedBytes(src: Array[Byte]) { // TODO
+      0
+      }
+   override def writeUnsignedInt(i: Int) { // TODO
+      }
+   override def writeUnsignedLong(l: Long) { // TODO
+      }
+   override def writerIndex: Int = buffer.writerIndex
+
+   /**
+    * Writes the length of the String followed by the String itself. This methods expects String not to be null.
+    */
+   override def writeString(msg: String) { // TODO
+      }
+
+   override def getUnderlyingChannelBuffer: AnyRef = buffer
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,37 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.{ChannelBuffer, ChannelBuffers}
+import org.jboss.netty.buffer.{ChannelBuffers => NettyChannelBuffers}
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object ChannelBuffersAdapter extends ChannelBuffers {
+
+//   override def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer = {
+////      val nettyBuffers = new Array[NettyChannelBuffer](buffers.length)
+////      val nettyBuffers = buffers.map(_.getUnderlyingChannelBuffer) : _*
+////      for (buffer <- buffers) {
+////         nettyBuffers + buffer.getUnderlyingChannelBuffer
+////      }
+//      val nettyBuffer = buffer.getUnderlyingChannelBuffer.asInstanceOf[NettyChannelBuffer]
+//      new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(nettyBuffer))
+//   }
+
+//   override def wrappedBuffer(array: Array[Byte]): ChannelBuffer = {
+//      new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array));
+//   }
+
+   override def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
+      new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array : _*));
+   }
+   
+   override def dynamicBuffer(): ChannelBuffer = {
+      new ChannelBufferAdapter(NettyChannelBuffers.dynamicBuffer());
+   }
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,43 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{ChannelFuture => NettyChannelFuture}
+import org.infinispan.server.core.transport.{Channel, ChannelFuture}
+import java.util.concurrent.TimeUnit
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelFutureAdapter(val future: NettyChannelFuture) extends ChannelFuture {
+   
+   override def getChannel: Channel = new ChannelAdapter(future.getChannel())
+
+   override def isDone: Boolean = future.isDone
+
+   override def isCancelled: Boolean = future.isCancelled
+
+   override def setSuccess: Boolean = future.setSuccess
+
+   override def setFailure(cause: Throwable): Boolean = future.setFailure(cause)
+
+   override def await: ChannelFuture = {
+      future.await
+      this
+   }
+
+   override def awaitUninterruptibly: ChannelFuture = {
+      future.awaitUninterruptibly
+      this
+   }
+
+   override def await(timeout: Long, unit: TimeUnit): Boolean = future.await(timeout, unit)
+
+   override def await(timeoutMillis: Long): Boolean = future.await(timeoutMillis)
+   
+   override def awaitUninterruptibly(timeout: Long, unit: TimeUnit): Boolean = future.awaitUninterruptibly(timeout, unit)
+   
+   override def awaitUninterruptibly(timeoutMillis: Long): Boolean = future.awaitUninterruptibly(timeoutMillis)
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,18 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{ChannelHandlerContext => NettyChannelHandlerContext}
+import org.infinispan.server.core.transport.{ChannelBuffers, Channel, ChannelHandlerContext}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelHandlerContextAdapter(val ctx: NettyChannelHandlerContext) extends ChannelHandlerContext {
+   
+   override def getChannel: Channel = new ChannelAdapter(ctx.getChannel)
+
+   override def getChannelBuffers: ChannelBuffers = ChannelBuffersAdapter
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,27 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder
+import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+import org.infinispan.server.core.transport._;
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class DecoderAdapter(decoder: Decoder) extends ReplayingDecoder[NoState](true) {
+
+   override def decode(nCtx: NettyChannelHandlerContext, channel: NettyChannel,
+                       nBuffer: NettyChannelBuffer, passedState: NoState): AnyRef = {
+      val ctx = new ChannelHandlerContextAdapter(nCtx);
+      val buffer = new ChannelBufferAdapter(nBuffer);
+      decoder.decode(ctx, buffer);
+   }
+
+   override def exceptionCaught(ctx: NettyChannelHandlerContext, e: NettyExceptionEvent) {
+      decoder.exceptionCaught(new ChannelHandlerContextAdapter(ctx), new ExceptionEventAdapter(e));
+   }
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,27 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.ChannelHandler
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
+import org.jboss.netty.channel.{ChannelHandlerContext => NettyChannelHandlerContext}
+import org.jboss.netty.channel.{Channel => NettyChannel}
+import org.infinispan.server.core.transport.{ChannelBuffer, Encoder}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at ChannelHandler.Sharable
+class EncoderAdapter(encoder: Encoder) extends OneToOneEncoder {
+
+   protected override def encode(nCtx: NettyChannelHandlerContext, ch: NettyChannel, msg: AnyRef): AnyRef = {
+      var ret = encoder.encode(new ChannelHandlerContextAdapter(nCtx), new ChannelAdapter(ch), msg);
+      ret = ret match {
+         case cb: ChannelBuffer => cb.getUnderlyingChannelBuffer
+         case _ => ret
+      }
+      ret
+   }
+   
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,16 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.ExceptionEvent
+import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ExceptionEventAdapter(event: NettyExceptionEvent) extends ExceptionEvent {
+
+   override def getCause: Throwable = return event.getCause
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,25 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class NettyChannelPipelineFactory(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler)
+      extends ChannelPipelineFactory {
+
+   override def getPipeline: ChannelPipeline = {
+      val pipeline = Channels.pipeline
+      if (decoder != null) {
+         pipeline.addLast("decoder", decoder);
+      }
+      if (encoder != null) {
+         pipeline.addLast("encoder", encoder);
+      }
+      return pipeline;
+   }
+
+}
\ No newline at end of file

Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	                        (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,127 @@
+package org.infinispan.server.core.transport.netty
+
+import java.net.SocketAddress
+import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import java.util.concurrent.atomic.AtomicInteger
+import org.jboss.netty.channel.{ChannelUpstreamHandler, ChannelDownstreamHandler, ChannelFactory, ChannelPipelineFactory}
+import org.jboss.netty.bootstrap.ServerBootstrap
+import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
+import org.infinispan.server.core.transport.Transport
+import org.infinispan.server.core.Logging
+import scala.collection.JavaConversions._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class NettyTransport(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler,
+                  address: SocketAddress, masterThreads: Int, workerThreads: Int, cacheName: String) extends Transport {
+   import NettyTransport._
+
+   val serverChannels = new DefaultChannelGroup(cacheName + "-channels")
+   val acceptedChannels = new DefaultChannelGroup(cacheName + "-accepted")
+   val pipeline = new NettyChannelPipelineFactory(decoder, encoder)
+   val factory = {
+      if (workerThreads == 0)
+         new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
+      else
+         new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads)
+   }
+   
+   lazy val masterExecutor = {
+      val tf = new NamedThreadFactory(cacheName + '-' + "Master")
+      if (masterThreads == 0) {
+         debug("Configured unlimited threads for master thread pool")
+         Executors.newCachedThreadPool(tf)
+      } else {
+         debug("Configured {0} threads for master thread pool", masterThreads)
+         Executors.newFixedThreadPool(masterThreads, tf)
+      }
+   }
+   lazy val workerExecutor = {
+      val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
+      if (workerThreads == 0) {
+         debug("Configured unlimited threads for worker thread pool")
+         Executors.newCachedThreadPool(tf)
+      }
+      else {
+         debug("Configured {0} threads for worker thread pool", workerThreads)
+         Executors.newFixedThreadPool(workerThreads, tf)
+      }      
+   }
+
+   override def start {
+      val bootstrap = new ServerBootstrap(factory);
+      bootstrap.setPipelineFactory(pipeline);
+      val ch = bootstrap.bind(address);
+      serverChannels.add(ch);
+   }
+
+   override def stop {
+      // We *pause* the acceptor so no new connections are made
+      var future = serverChannels.unbind().awaitUninterruptibly();
+      if (!future.isCompleteSuccess()) {
+         warn("Server channel group did not completely unbind");
+//         val iter = future.getGroup().iterator
+//         while (iter.hasNext) {
+//            val ch = iter.next
+//            if (ch.isBound()) {
+//               warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+//            }
+//         }
+
+         for (ch <- asIterator(future.getGroup().iterator)) {
+            if (ch.isBound()) {
+               warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+            }
+         }
+      }
+
+      // TODO remove workaround when inteating Netty 3.2.x - https://jira.jboss.org/jira/browse/NETTY-256
+      masterExecutor.shutdown();
+      try {
+         masterExecutor.awaitTermination(30, TimeUnit.SECONDS);
+      } catch {
+         case ie: InterruptedException => ie.printStackTrace();
+      }
+
+      workerExecutor.shutdown();
+      serverChannels.close().awaitUninterruptibly();
+      future = acceptedChannels.close().awaitUninterruptibly();
+      if (!future.isCompleteSuccess()) {
+         warn("Channel group did not completely close");
+//         val iter = future.getGroup().iterator
+//         while (iter.hasNext) {
+//            val ch = iter.next
+//            if (ch.isBound()) {
+//               warn(ch + " is still connected to " + ch.getRemoteAddress());
+//            }
+//         }
+         
+         for (ch <- asIterator(future.getGroup().iterator)) {
+            if (ch.isBound()) {
+               warn(ch + " is still connected to " + ch.getRemoteAddress());
+            }
+         }
+      }
+//      debug("Channel group completely closed");
+      debug("Channel group completely closed, release external resources");
+      factory.releaseExternalResources();
+   }
+
+}
+
+object NettyTransport extends Logging
+
+private class NamedThreadFactory(val name: String) extends ThreadFactory {
+   val threadCounter = new AtomicInteger
+
+   override def newThread(r: Runnable): Thread = {
+      var t = new Thread(r, System.getProperty("program.name") + "-" + name + '-' + threadCounter.incrementAndGet)
+      t.setDaemon(true)
+      t
+   }
+}

Added: trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
===================================================================
--- trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala	                        (rev 0)
+++ trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,35 @@
+package org.infinispan.server.core
+
+import org.testng.annotations.Test
+import org.infinispan.remoting.transport.Address
+import org.testng.Assert._
+import org.infinispan.server.core.VersionGenerator._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at Test(groups = Array("functional"), testName = "server.core.VersionGeneratorTest")
+class VersionGeneratorTest {
+
+   def testGenerateVersion {
+      val addr1 = new TestAddress(1)
+      val addr2 = new TestAddress(2)
+      val addr3 = new TestAddress(1)
+      val members = List(addr1, addr2, addr3)
+      assertEquals(newVersion(Some(addr2), Some(members), 1), 0x1000200000001L)
+   }
+}
+
+class TestAddress(val addressNum: Int) extends Address {
+   override def equals(o: Any): Boolean = {
+      o match {
+         case ta: TestAddress => ta.addressNum == addressNum
+         case _ => false
+      }
+   }
+   override def hashCode = addressNum
+   override def toString = "TestAddress#" + addressNum
+}
\ No newline at end of file

Modified: trunk/server/memcached/pom.xml
===================================================================
--- trunk/server/memcached/pom.xml	2010-03-22 20:12:52 UTC (rev 1612)
+++ trunk/server/memcached/pom.xml	2010-03-23 17:59:55 UTC (rev 1613)
@@ -33,6 +33,42 @@
       </dependency>
    </dependencies>
 
+      <build>
+      <finalName>infinispan</finalName>
+      <sourceDirectory>src/main/scala</sourceDirectory>
+      <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+      <plugins>
+         <plugin>
+            <groupId>org.scala-tools</groupId>
+            <artifactId>maven-scala-plugin</artifactId>
+            <executions>
+               <execution>
+                  <id>compile</id>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+                  <phase>compile</phase>
+               </execution>
+               <execution>
+                  <id>test-compile</id>
+                  <goals>
+                     <goal>testCompile</goal>
+                  </goals>
+                  <phase>test-compile</phase>
+               </execution>
+               <execution>
+                  <phase>process-resources</phase>
+                  <goals>
+                     <goal>compile</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+      </plugins>
+
+   </build>
+   
   <repositories>
     <repository>
       <id>spy</id>

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,407 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.manager.{CacheManager}
+import org.infinispan.server.core.Operation._
+import org.infinispan.server.memcached.MemcachedOperation._
+import org.infinispan.context.Flag
+import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
+import org.infinispan.{Version, CacheException, Cache}
+import java.io.{IOException, EOFException, StreamCorruptedException}
+import java.nio.channels.ClosedChannelException
+import java.util.concurrent.atomic.AtomicLong
+import org.infinispan.stats.Stats
+import org.infinispan.server.core.transport.{Channel, ChannelBuffers, ChannelHandlerContext, ChannelBuffer}
+import org.infinispan.server.core._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class MemcachedDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
+   import MemcachedDecoder._
+
+   type SuitableParameters = MemcachedParameters
+
+   private var scheduler: ScheduledExecutorService = _
+   private var cache: Cache[String, MemcachedValue] = _
+   private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
+   private final val incrMisses = new AtomicLong(0)
+   private final val incrHits = new AtomicLong(0)
+   private final val decrMisses = new AtomicLong(0)
+   private final val decrHits = new AtomicLong(0)
+   private final val replaceIfUnmodifiedMisses = new AtomicLong(0)
+   private final val replaceIfUnmodifiedHits = new AtomicLong(0)
+   private final val replaceIfUnmodifiedBadval = new AtomicLong(0)
+
+   override def readHeader(buffer: ChannelBuffer): RequestHeader = {
+      val streamOp = readElement(buffer)
+      val op = OperationResolver.resolve(streamOp)
+      if (op == None) {
+         val line = readLine(buffer) // Read rest of line to clear the operation
+         throw new UnknownOperationException("Unknown operation: " + streamOp);
+      }
+      new RequestHeader(op.get)
+   }
+
+   override def readKey(buffer: ChannelBuffer): String = {
+      readElement(buffer)
+   }
+
+   override def readKeys(buffer: ChannelBuffer): Array[String] = {
+      val line = readLine(buffer)
+      line.trim.split(" +")
+   }
+
+   override def readParameters(op: Enumeration#Value, buffer: ChannelBuffer): MemcachedParameters = {
+      val line = readLine(buffer)
+      if (!line.isEmpty) {
+         trace("Operation parameters: {0}", line)
+         val args = line.trim.split(" +")
+         var index = 0
+         op match {
+            case DeleteRequest => {
+               val delayedDeleteTime = parseDelayedDeleteTime(index, args)
+               val noReply = if (delayedDeleteTime == -1) parseNoReply(index, args) else false
+               new MemcachedParameters(null, -1, -1, -1, noReply, 0, "", 0)
+            }
+            case IncrementRequest | DecrementRequest => {
+               val delta = args(index)
+               index += 1
+               new MemcachedParameters(null, -1, -1, -1, parseNoReply(index, args), 0, delta, 0) 
+            }
+            case FlushAllRequest => {
+               val flushDelay = args(index).toInt
+               index += 1
+               new MemcachedParameters(null, -1, -1, -1, parseNoReply(index, args), 0, "", flushDelay)
+            }
+            case _ => {
+               val flags = getFlags(args(index))
+               index += 1
+               val lifespan = {
+                  val streamLifespan = getLifespan(args(index))
+                  if (streamLifespan <= 0) -1 else streamLifespan
+               }
+               index += 1
+               val length = getLength(args(index))
+               val streamVersion = op match {
+                  case ReplaceIfUnmodifiedRequest => {
+                     index += 1
+                     getVersion(args(index))
+                  }
+                  case _ => -1
+               }
+               index += 1
+               val noReply = parseNoReply(index, args)
+               val data = new Array[Byte](length)
+               buffer.readBytes(data, 0, data.length)
+               readLine(buffer) // read the rest of line to clear CRLF after value Byte[]
+               new MemcachedParameters(data, lifespan, -1, streamVersion, noReply, flags, "", 0)
+            }
+         }
+      } else {
+         // For example when delete <key> is sent without any further parameters
+         new MemcachedParameters(null, -1, -1, -1, false, 0, "", 0)
+      }
+   }
+
+   override def createValue(params: MemcachedParameters, nextVersion: Long): MemcachedValue = {
+      new MemcachedValue(params.data, nextVersion, params.flags)
+   }
+
+   private def getFlags(flags: String): Int = {
+      if (flags == null) throw new EOFException("No flags passed")
+      flags.toInt
+   }
+
+   private def getLifespan(lifespan: String): Int = {
+      if (lifespan == null) throw new EOFException("No expiry passed")
+      lifespan.toInt
+   }
+
+   private def getLength(length: String): Int = {
+      if (length == null) throw new EOFException("No bytes passed")
+      length.toInt
+   }
+
+   private def getVersion(version: String): Long = {
+      if (version == null) throw new EOFException("No cas passed")
+      version.toLong
+   }
+
+   private def parseNoReply(expectedIndex: Int, args: Array[String]): Boolean = {
+      if (args.length > expectedIndex) {
+         if ("noreply" == args(expectedIndex))
+            true
+         else
+            throw new StreamCorruptedException("Unable to parse noreply optional argument")
+      }
+      else false      
+   }
+
+   private def parseDelayedDeleteTime(expectedIndex: Int, args: Array[String]): Int = {
+      if (args.length > expectedIndex) {
+         try {
+            args(expectedIndex).toInt
+         }
+         catch {
+            case e: NumberFormatException => return -1 // Either unformatted number, or noreply found
+         }
+      }
+      else 0
+   }
+
+   override def getCache(header: RequestHeader): Cache[String, MemcachedValue] = cache
+
+   protected def createCache: Cache[String, MemcachedValue] = cacheManager.getCache[String, MemcachedValue]
+
+   override def handleCustomRequest(header: RequestHeader, ctx: ChannelHandlerContext,
+                                    buffer: ChannelBuffer, cache: Cache[String, MemcachedValue]) {
+      header.op match {
+         case AppendRequest | PrependRequest => {
+            val k = readKey(buffer)
+            val params = readParameters(header.op, buffer)
+            val prev = cache.get(k)
+            if (prev != null) {
+               val concatenated = header.op match {
+                  case AppendRequest => concat(prev.v, params.data);
+                  case PrependRequest => concat(params.data, prev.v);
+               }
+               val next = createValue(concatenated, generateVersion(cache), params.flags)
+               val replaced = cache.replace(k, prev, next);
+               if (replaced)
+                  sendResponse(header, ctx, None, None, Some(params), Some(prev))
+               else // If there's a concurrent modification on this key, treat it as we couldn't replace it
+                  sendResponse(header, ctx, None, None, Some(params), Some(null)) //
+            } else {
+               sendResponse(header, ctx, None, None, Some(params), Some(null))
+            }
+         }
+         case IncrementRequest | DecrementRequest => {
+            val k = readKey(buffer)
+            val params = readParameters(header.op, buffer)
+            val prev = cache.get(k)
+            if (prev != null) {
+               val prevCounter = new String(prev.v)
+               val newCounter =
+                  header.op match {
+                     case IncrementRequest => prevCounter.toLong + params.delta.toLong
+                     case DecrementRequest => {
+                        val candidateCounter = prevCounter.toLong - params.delta.toLong
+                        if (candidateCounter < 0) 0 else candidateCounter
+                     }
+                  }
+               val next = createValue(newCounter.toString.getBytes, generateVersion(cache), params.flags)
+               var replaced = cache.replace(k, prev, next)
+               if (replaced)
+                  sendResponse(header, ctx, None, Some(next), Some(params), Some(prev))
+               else // If there's a concurrent modification on this key, the spec does not say what to do, so treat it as exceptional
+                  throw new CacheException("Value modified since we retrieved from the cache, old value was " + prevCounter)
+            } else {
+               sendResponse(header, ctx, None, None, Some(params), Some(null))
+            }
+         }
+         case FlushAllRequest => {
+            val params = readParameters(header.op, buffer)
+            val flushFunction = (cache: Cache[String, MemcachedValue]) => cache.getAdvancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STORE).clear
+            if (params.flushDelay == 0)
+               // cache.getAdvancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STORE).clear
+               flushFunction(cache)
+            else
+               scheduler.schedule(new DelayedFlushAll(cache, flushFunction), params.flushDelay, TimeUnit.SECONDS)
+            sendResponse(header, ctx, None, None, Some(params), None)
+         }
+         case VersionRequest => sendResponse(header, ctx, None, None, None, None)
+//         case StatsRequest => 
+      }
+   }
+
+   // todo: potentially move this up when implementing hotrod since only what's written might change, the rest of logic is likely to be the same
+   override def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext,
+                             k: Option[String], v: Option[MemcachedValue],
+                             params: Option[MemcachedParameters], prev: Option[MemcachedValue]) {
+      val buffers = ctx.getChannelBuffers
+      val ch = ctx.getChannel
+      if (params == None || !params.get.noReply) {
+         header.op match {
+            case PutRequest => ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+            case GetRequest | GetWithVersionRequest => {
+               if (v.get != null)
+                  ch.write(buildGetResponse(header.op, ctx, k.get, v.get))
+               ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+            }
+            case PutIfAbsentRequest => {
+               if (prev.get == null)
+                  ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+               else
+                  ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
+            }
+            case ReplaceRequest | AppendRequest | PrependRequest => {
+               if (prev.get == null)
+                  ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
+               else
+                  ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+            }
+            case ReplaceIfUnmodifiedRequest => {
+               if (v != None && prev != None) {
+                  if (isStatsEnabled) replaceIfUnmodifiedHits.incrementAndGet
+                  ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+               } else if (v == None && prev != None) {
+                  if (isStatsEnabled) replaceIfUnmodifiedBadval.incrementAndGet
+                  ch.write(buffers.wrappedBuffer("EXISTS\r\n".getBytes))
+               } else {
+                  if (isStatsEnabled) replaceIfUnmodifiedMisses.incrementAndGet
+                  ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+               }                  
+            }
+            case DeleteRequest => {
+               if (prev.get == null)
+                  ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+               else
+                  ch.write(buffers.wrappedBuffer("DELETED\r\n".getBytes))
+            }
+            case IncrementRequest | DecrementRequest => {
+               if (prev.get == null) {
+                  if (isStatsEnabled) if (header.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
+                  ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+               } else {
+                  if (isStatsEnabled) if (header.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
+                  ch.write(buffers.wrappedBuffer((new String(v.get.v) + CRLF).getBytes))
+               }
+            }
+            case FlushAllRequest => {
+               ch.write(buffers.wrappedBuffer("OK\r\n".getBytes))
+            }
+            case VersionRequest => {
+               val sb = new StringBuilder
+               sb.append("VERSION ").append(Version.version).append(CRLF)
+               ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+            }
+         }
+      }
+   }
+
+   override def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, pairs: Map[String, MemcachedValue]) {
+      val buffers = ctx.getChannelBuffers
+      val ch = ctx.getChannel
+         header.op match {
+            case GetRequest | GetWithVersionRequest => {
+               for ((k, v) <- pairs)
+                  ch.write(buildGetResponse(header.op, ctx, k, v))
+               ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+            }
+      }
+   }
+
+   override def sendResponse(ctx: ChannelHandlerContext, t: Throwable) {
+      val ch = ctx.getChannel
+      val buffers = ctx.getChannelBuffers
+      t match {
+         case uoe: UnknownOperationException => ch.write(buffers.wrappedBuffer("ERROR\r\n".getBytes))
+         case cce: ClosedChannelException => // no-op, only log
+         case _ => {
+            val sb = new StringBuilder
+            t match {
+               case ioe: IOException => sb.append("CLIENT_ERROR ")
+               case _ => sb.append("SERVER_ERROR ")
+            }
+            sb.append(t).append(CRLF)
+            ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+         }
+      }
+   }
+
+   def sendResponse(ctx: ChannelHandlerContext, stats: Stats) {
+      var buffers = ctx.getChannelBuffers
+      var ch = ctx.getChannel
+      var sb = new StringBuilder
+
+      writeStat("pid", 0, sb, buffers, ch) // Unsupported
+      writeStat("uptime", stats.getTimeSinceStart, sb, buffers, ch)
+      writeStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb, buffers, ch)
+      writeStat("version", cache.getVersion, sb, buffers, ch)
+      writeStat("pointer_size", 0, sb, buffers, ch) // Unsupported
+      writeStat("rusage_user", 0, sb, buffers, ch) // Unsupported
+      writeStat("rusage_system", 0, sb, buffers, ch) // Unsupported
+      writeStat("curr_items", stats.getCurrentNumberOfEntries, sb, buffers, ch)
+      writeStat("total_items", stats.getTotalNumberOfEntries, sb, buffers, ch)
+      writeStat("bytes", 0, sb, buffers, ch) // Unsupported
+      writeStat("curr_connections", 0, sb, buffers, ch) // TODO: Through netty?
+      writeStat("total_connections", 0, sb, buffers, ch) // TODO: Through netty?
+      writeStat("connection_structures", 0, sb, buffers, ch) // Unsupported
+      writeStat("cmd_get", stats.getRetrievals, sb, buffers, ch)
+      writeStat("cmd_set", stats.getStores, sb, buffers, ch)
+      writeStat("get_hits", stats.getHits, sb, buffers, ch)
+      writeStat("get_misses", stats.getMisses, sb, buffers, ch)
+      writeStat("delete_misses", stats.getRemoveMisses, sb, buffers, ch)
+      writeStat("delete_hits", stats.getRemoveHits, sb, buffers, ch)
+      writeStat("incr_misses", incrMisses, sb, buffers, ch)
+      writeStat("incr_hits", incrHits, sb, buffers, ch)
+      writeStat("decr_misses", decrMisses, sb, buffers, ch)
+      writeStat("decr_hits", decrHits, sb, buffers, ch)
+      writeStat("cas_misses", replaceIfUnmodifiedMisses, sb, buffers, ch)
+      writeStat("cas_hits", replaceIfUnmodifiedHits, sb, buffers, ch)
+      writeStat("cas_badval", replaceIfUnmodifiedBadval, sb, buffers, ch)
+      writeStat("auth_cmds", 0, sb, buffers, ch) // Unsupported
+      writeStat("auth_errors", 0, sb, buffers, ch) // Unsupported
+      //TODO: Evictions are measure by evict calls, but not by nodes are that are expired after the entry's lifespan has expired.
+      writeStat("evictions", stats.getEvictions, sb, buffers, ch)
+      writeStat("bytes_read", 0, sb, buffers, ch) // TODO: Through netty?
+      writeStat("bytes_written", 0, sb, buffers, ch) // TODO: Through netty?
+      writeStat("limit_maxbytes", 0, sb, buffers, ch) // Unsupported
+      writeStat("threads", 0, sb, buffers, ch) // TODO: Through netty?
+      writeStat("conn_yields", 0, sb, buffers, ch) // Unsupported
+
+      ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+   }
+
+   private def writeStat(stat: String, value: Any, sb: StringBuilder, buffers: ChannelBuffers, ch: Channel) {
+      sb.append("STAT").append(' ').append(stat).append(' ').append(value).append(CRLF)
+      ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+      sb.setLength(0)
+   }
+
+   override def start {
+      scheduler = Executors.newScheduledThreadPool(1)
+      cache = createCache
+   }
+
+   override def stop {
+      scheduler.shutdown
+   }
+
+   private def createValue(data: Array[Byte], nextVersion: Long, flags: Int): MemcachedValue = {
+      new MemcachedValue(data, nextVersion, flags)
+   }   
+
+   private def buildGetResponse(op: Enumeration#Value, ctx: ChannelHandlerContext,
+                                k: String, v: MemcachedValue): ChannelBuffer = {
+      val header = buildGetResponseHeader(k, v, op)
+      ctx.getChannelBuffers.wrappedBuffer(header.getBytes, v.v, CRLF.getBytes)
+   }
+
+   private def buildGetResponseHeader(k: String, v: MemcachedValue, op: Enumeration#Value): String = {
+      val sb = new StringBuilder
+      sb.append("VALUE ").append(k).append(" ").append(v.flags).append(" ").append(v.v.length).append(" ")
+      if (op == GetWithVersionRequest)
+         sb.append(v.version).append(" ")   
+      sb.append(CRLF)
+      sb.toString
+   }
+
+}
+
+object MemcachedDecoder extends Logging
+
+class MemcachedParameters(override val data: Array[Byte], override val lifespan: Int,
+                          override val maxIdle: Int, override val version: Long,
+                          val noReply: Boolean, val flags: Int, val delta: String,
+                          val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, version)
+
+private class DelayedFlushAll(cache: Cache[String, MemcachedValue],
+                              flushFunction: Cache[String, MemcachedValue] => Unit) extends Runnable {
+   override def run() = flushFunction(cache)
+}
+
+

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,16 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Operation._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object MemcachedOperation extends Enumeration(10) {
+   type MemcachedOperation = Value
+   val AppendRequest, PrependRequest = Value
+   val IncrementRequest, DecrementRequest = Value
+   val FlushAllRequest, VersionRequest = Value
+}
\ No newline at end of file

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,19 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.AbstractProtocolServer
+import org.infinispan.server.core.transport.{Decoder, Encoder}
+import org.infinispan.manager.CacheManager
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class MemcachedServer extends AbstractProtocolServer {
+
+   protected override def getEncoder: Encoder = null
+
+   protected override def getDecoder(cacheManager: CacheManager): Decoder = new MemcachedDecoder(cacheManager)
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,40 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Value
+import org.infinispan.util.Util
+import java.io.{ObjectOutput, ObjectInput}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// TODO: Make it a hardcoded Externalizer
+class MemcachedValue(override val v: Array[Byte], override val version: Long, val flags: Int)
+      extends Value(v, version) {
+
+   override def toString = {
+      new StringBuilder().append("MemcachedValue").append("{")
+         .append("v=").append(Util.printArray(v, false))
+         .append(", version=").append(version)
+         .append(", flags=").append(flags)
+         .append("}").toString
+   }
+
+//   override def readExternal(in: ObjectInput) {
+////      v = new Array[Byte](in.read())
+////      in.read(v)
+////      version = in.readLong
+//      super.readExternal(in)
+//      flags = in.readInt
+//   }
+//
+//   override def writeExternal(out: ObjectOutput) {
+//      super.writeExternal(out)
+////      out.write(v.length)
+////      out.write(v)
+////      out.writeLong(version)
+//      out.writeInt(flags)
+//   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,38 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Operation._
+import org.infinispan.server.memcached.MemcachedOperation._
+import org.infinispan.server.core.Logging
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// todo: maybe try to abstract this into something that can be shared betwen hr and memcached
+object OperationResolver extends Logging {
+   // TODO: Rather than holding a map, check if the String could be passed as part of the Enumeration and whether this could be retrieved in a O(1) op
+   private val operations = Map[String, Enumeration#Value](
+      "set" -> PutRequest, 
+      "add" -> PutIfAbsentRequest,
+      "replace" -> ReplaceRequest,
+      "cas" -> ReplaceIfUnmodifiedRequest,
+      "append" -> AppendRequest,
+      "prepend" -> PrependRequest,
+      "get" -> GetRequest,
+      "gets" -> GetWithVersionRequest,
+      "delete" -> DeleteRequest,
+      "incr" -> IncrementRequest,
+      "decr" -> DecrementRequest,
+      "flush_all" -> FlushAllRequest,
+      "version" -> VersionRequest,
+      "stats" -> StatsRequest
+   )
+
+   def resolve(commandName: String): Option[Enumeration#Value] = {
+      trace("Operation: {0}", commandName)
+      val op = operations.get(commandName)
+      op
+   }
+}
+

Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala	                        (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,77 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.transport.ChannelBuffer
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// todo: refactor name once old code has been removed?
+trait TextProtocolUtil {
+
+   final val CRLF = "\r\n"
+   final val CR = 13
+   final val LF = 10
+
+   // TODO: maybe convert to recursive
+   def readElement(buffer: ChannelBuffer): String = {
+      if (buffer.readableBytes > 0)
+         readElement(buffer, new StringBuilder())
+      else
+         ""
+   }
+
+   private def readElement(buffer: ChannelBuffer, sb: StringBuilder): String = {
+      var next = buffer.readByte 
+      if (next == 32) { // Space
+         sb.toString.trim
+      }
+      else if (next == 13) { // CR
+         next = buffer.readByte
+         if (next == 10) { // LF
+            sb.toString.trim
+         } else {
+            sb.append(next.asInstanceOf[Char])
+            readElement(buffer, sb)
+         }
+      }
+      else {
+         sb.append(next.asInstanceOf[Char])
+         readElement(buffer, sb)
+      }
+   }
+
+   def readLine(buffer: ChannelBuffer): String = {
+      if (buffer.readableBytes > 0)
+         readLine(buffer, new StringBuilder())         
+      else
+         ""
+   }
+
+   private def readLine(buffer: ChannelBuffer, sb: StringBuilder): String = {
+      var next = buffer.readByte
+      if (next == 13) { // CR
+         next = buffer.readByte
+         if (next == 10) { // LF
+            sb.toString.trim
+         } else {
+            sb.append(next.asInstanceOf[Char])
+            readLine(buffer, sb)
+         }
+      } else if (next == 10) { //LF
+         sb.toString.trim
+      } else {
+         sb.append(next.asInstanceOf[Char])
+         readLine(buffer, sb)
+      }
+   }
+
+   def concat(a: Array[Byte], b: Array[Byte]): Array[Byte] = {
+       val data = new Array[Byte](a.length + b.length)
+       Array.copy(a, 0, data, 0, a.length)
+       Array.copy(b, 0, data, a.length, b.length)
+       return data
+   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,334 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import org.infinispan.manager.CacheManager
+import test.MemcachedTestingUtil
+import java.lang.reflect.Method
+import java.util.concurrent.TimeUnit
+import org.testng.Assert._
+import org.testng.annotations.{AfterClass, Test}
+import net.spy.memcached.{CASResponse, MemcachedClient}
+import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+import java.net.SocketAddress
+import org.infinispan.Version
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.FunctionalTest")
+class MemcachedFunctionalTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+   private var client: MemcachedClient = _
+   private var server: MemcachedServer = _
+   private val timeout: Int = 60
+
+   override def createCacheManager: CacheManager = {
+      cacheManager = TestCacheManagerFactory.createLocalCacheManager
+      server = startMemcachedTextServer(cacheManager)
+      client = createMemcachedClient(60000, server.getPort)
+      return cacheManager
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      super.destroyAfterClass
+      log.debug("Test finished, close memcached server", null)
+      client.shutdown
+      server.stop
+   }
+
+   def testSetBasic(m: Method) {
+      val f = client.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+   }
+
+   def testSetWithExpirySeconds(m: Method) {
+      val f = client.set(k(m), 1, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+   }
+
+   def testSetWithExpiryUnixTime(m: Method) {
+      val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+      val f = client.set(k(m), future, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+   }
+
+   def testGetMultipleKeys(m: Method) {
+      val f1 = client.set(k(m, "k1-"), 0, v(m, "v1-"))
+      val f2 = client.set(k(m, "k2-"), 0, v(m, "v2-"))
+      val f3 = client.set(k(m, "k3-"), 0, v(m, "v3-"))
+      assertTrue(f1.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertTrue(f2.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertTrue(f3.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val keys = List(k(m, "k1-"), k(m, "k2-"), k(m, "k3-"))
+      val ret = client.getBulk(keys: _*)
+      assertEquals(ret.get(k(m, "k1-")), v(m, "v1-"))
+      assertEquals(ret.get(k(m, "k2-")), v(m, "v2-"))
+      assertEquals(ret.get(k(m, "k3-")), v(m, "v3-"))
+   }
+
+   def testAddBasic(m: Method) {
+      addAndGet(m)
+   }
+
+   def testAddWithExpirySeconds(m: Method) {
+      var f = client.add(k(m), 1, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+      f = client.add(k(m), 0, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m, "v1-"))
+   }
+
+   def testAddWithExpiryUnixTime(m: Method) {
+      val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+      var f = client.add(k(m), future, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+      f = client.add(k(m), 0, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m, "v1-"))
+   }
+
+   def testNotAddIfPresent(m: Method) {
+      addAndGet(m)
+      val f = client.add(k(m), 0, v(m, "v1-"))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+   }
+
+   def testReplaceBasic(m: Method) {
+      addAndGet(m)
+      val f = client.replace(k(m), 0, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m, "v1-"))
+   }
+
+   def testNotReplaceIfNotPresent(m: Method) {
+      val f = client.replace(k(m), 0, v(m))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertNull(client.get(k(m)))
+   }
+
+   def testReplaceWithExpirySeconds(m: Method) {
+      addAndGet(m)
+      val f = client.replace(k(m), 1, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m, "v1-"))
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+   }
+
+   def testReplaceWithExpiryUnixTime(m: Method) {
+      addAndGet(m)
+      val future: Int = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+      val f = client.replace(k(m), future, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m, "v1-"))
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m)))
+   }
+
+   def testAppendBasic(m: Method) {
+      addAndGet(m)
+      val f = client.append(0, k(m), v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val expected = v(m) + v(m, "v1-")
+      assertEquals(client.get(k(m)), expected)
+   }
+
+   def testAppendNotFound(m: Method) {
+      addAndGet(m)
+      val f = client.append(0, k(m, "k2-"), v(m, "v1-"))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+      assertNull(client.get(k(m, "k2-")))
+   }
+
+   def testPrependBasic(m: Method) {
+      addAndGet(m)
+      val f = client.prepend(0, k(m), v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val expected = v(m, "v1-") + v(m)
+      assertEquals(client.get(k(m)), expected)
+   }
+
+   def testPrependNotFound(m: Method) {
+      addAndGet(m)
+      val f = client.prepend(0, k(m, "k2-"), v(m, "v1-"))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+      assertNull(client.get(k(m, "k2-")))
+   }
+
+   def testGetsBasic(m: Method) {
+      addAndGet(m)
+      var value = client.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+   }
+
+   def testCasBasic(m: Method) {
+      addAndGet(m)
+      var value = client.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+      var resp = client.cas(k(m), value.getCas, v(m, "v1-"))
+      assertEquals(resp, CASResponse.OK)
+   }
+
+   def testCasNotFound(m: Method) {
+      addAndGet(m)
+      var value = client.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+      var resp = client.cas(k(m, "k1-"), value.getCas, v(m, "v1-"))
+      assertEquals(resp, CASResponse.NOT_FOUND)
+   }
+
+   def testCasExists(m: Method) {
+      addAndGet(m)
+      var value = client.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+      val old = value.getCas
+      var resp = client.cas(k(m), value.getCas, v(m, "v1-"))
+      value = client.gets(k(m))
+      assertEquals(value.getValue(), v(m, "v1-"))
+      assertTrue(value.getCas() != 0)
+      assertTrue(value.getCas() != old)
+      resp = client.cas(k(m), old, v(m, "v2-"))
+      assertEquals(resp, CASResponse.EXISTS)
+      resp = client.cas(k(m), value.getCas, v(m, "v2-"))
+      assertEquals(resp, CASResponse.OK)
+   }
+
+   def testDeleteBasic(m: Method) {
+      addAndGet(m)
+      val f = client.delete(k(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertNull(client.get(k(m)))
+   }
+
+   def testDeleteDoesNotExist(m: Method) {
+      val f = client.delete(k(m))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+   }
+
+   def testIncrementBasic(m: Method) {
+      val f = client.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val result = client.incr(k(m), 1)
+      assertEquals(result, 2)
+   }
+
+   def testIncrementTriple(m: Method) {
+      val f = client.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.incr(k(m), 1), 2)
+      assertEquals(client.incr(k(m), 2), 4)
+      assertEquals(client.incr(k(m), 4), 8)
+   }
+
+   def testIncrementNotExist(m: Method) {
+      assertEquals(client.incr(k(m), 1), -1)
+   }
+
+   def testIncrementIntegerMax(m: Method) {
+      val f = client.set(k(m), 0, "0")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.incr(k(m), Integer.MAX_VALUE), Integer.MAX_VALUE)
+   }
+
+   def testIncrementBeyondIntegerMax(m: Method) {
+      val f = client.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val newValue = client.incr(k(m), Integer.MAX_VALUE)
+      assertEquals(newValue, Int.MaxValue.asInstanceOf[Long] + 1)
+   }
+
+   def testDecrementBasic(m: Method) {
+      var f = client.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.decr(k(m), 1), 0)
+   }
+
+   def testDecrementTriple(m: Method) {
+      var f = client.set(k(m), 0, "8")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.decr(k(m), 1), 7)
+      assertEquals(client.decr(k(m), 2), 5)
+      assertEquals(client.decr(k(m), 4), 1)
+   }
+
+   def testDecrementNotExist(m: Method): Unit = {
+      assertEquals(client.decr(k(m), 1), -1)
+   }
+
+   def testDecrementBelowZero(m: Method) {
+      var f = client.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      var newValue = client.decr(k(m), 2)
+      assertEquals(newValue, 0)
+   }
+
+   def testFlushAll(m: Method) {
+      for (i <- 1 to 5) {
+         val key = k(m, "k" + i + "-");
+         val value = v(m, "v" + i + "-");
+         val f = client.set(key, 0, value);
+         assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+         assertEquals(client.get(key), value)
+      }
+
+      val f = client.flush();
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+
+      for (i <- 1 to 5) {
+         val key = k(m, "k" + i + "-");
+         assertNull(client.get(key))
+      }
+   }
+
+   def testFlushAllDelayed(m: Method) {
+      for (i <- 1 to 5) {
+         val key = k(m, "k" + i + "-");
+         val value = v(m, "v" + i + "-");
+         val f = client.set(key, 0, value);
+         assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+         assertEquals(client.get(key), value)
+      }
+
+      val f = client.flush(2);
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+
+      TestingUtil.sleepThread(2200);
+
+      for (i <- 1 to 5) {
+         val key = k(m, "k" + i + "-");
+         assertNull(client.get(key))
+      }
+   }
+
+   def testVersion {
+      val versions = client.getVersions
+      assertEquals(versions.size(), 1)
+      val version = versions.values.iterator.next
+      assertEquals(version, Version.version)
+   }
+
+   private def addAndGet(m: Method) {
+      val f = client.add(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,142 @@
+package org.infinispan.server.memcached
+
+import org.testng.Assert._
+import org.infinispan.test.MultipleCacheManagersTest
+import test.MemcachedTestingUtil
+import org.infinispan.config.Configuration
+import org.testng.annotations.{AfterClass, Test}
+import java.util.concurrent.TimeUnit
+import java.lang.reflect.Method
+import net.spy.memcached.{CASResponse, MemcachedClient}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedClusterTest")
+class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
+   private val cacheName = "MemcachedReplSync"
+   private[this] var servers: List[MemcachedServer] = List()
+   private[this] var clients: List[MemcachedClient] = List()
+   private val timeout: Int = 60
+
+   @Test(enabled = false) // Disable explicitly to avoid TestNG thinking this is a test!!
+   override def createCacheManagers {
+      var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+      createClusteredCaches(2, cacheName, replSync)
+      servers = startMemcachedTextServer(cacheManagers.get(0), cacheName) :: servers
+      servers = startMemcachedTextServer(cacheManagers.get(1), servers.head.getPort + 50, cacheName) :: servers
+      servers.foreach(s => clients = createMemcachedClient(60000, s.getPort) :: clients)
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroy {
+      super.destroy
+      log.debug("Test finished, close Hot Rod server", null)
+      clients.foreach(_.shutdown)
+      servers.foreach(_.stop)
+   }
+
+   def testReplicatedSet(m: Method) {
+      val f = clients.head.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.get(k(m)), v(m))
+   }
+
+   def testReplicatedGetMultipleKeys(m: Method) {
+      val f1 = clients.head.set(k(m, "k1-"), 0, v(m, "v1-"))
+      val f2 = clients.head.set(k(m, "k2-"), 0, v(m, "v2-"))
+      val f3 = clients.head.set(k(m, "k3-"), 0, v(m, "v3-"))
+      assertTrue(f1.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertTrue(f2.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertTrue(f3.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val keys = List(k(m, "k1-"), k(m, "k2-"), k(m, "k3-"))
+      val ret = clients.tail.head.getBulk(keys: _*)
+      assertEquals(ret.get(k(m, "k1-")), v(m, "v1-"))
+      assertEquals(ret.get(k(m, "k2-")), v(m, "v2-"))
+      assertEquals(ret.get(k(m, "k3-")), v(m, "v3-"))
+   }
+
+   def testReplicatedAdd(m: Method) {
+      val f = clients.head.add(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.get(k(m)), v(m))
+   }
+
+   def testReplicatedReplace(m: Method) {
+      var f = clients.head.add(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.get(k(m)), v(m))
+      f = clients.tail.head.replace(k(m), 0, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.head.get(k(m)), v(m, "v1-"))
+   }
+
+   def testReplicatedAppend(m: Method) {
+      var f = clients.head.add(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.get(k(m)), v(m))
+      f = clients.tail.head.append(0, k(m), v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val expected = v(m).toString + v(m, "v1-").toString
+      assertEquals(clients.head.get(k(m)), expected)
+   }
+
+   def testReplicatedPrepend(m: Method) {
+      var f = clients.head.add(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.get(k(m)), v(m))
+      f = clients.tail.head.prepend(0, k(m), v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      val expected = v(m, "v1-").toString + v(m).toString
+      assertEquals(clients.head.get(k(m)), expected)
+  }
+
+   def testReplicatedGets(m: Method) {
+      var f = clients.head.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      var value = clients.tail.head.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+   }
+
+   def testReplicatedCasExists(m: Method) {
+      var f = clients.head.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      var value = clients.tail.head.gets(k(m))
+      assertEquals(value.getValue(), v(m))
+      assertTrue(value.getCas() != 0)
+      val old = value.getCas
+      var resp = clients.tail.head.cas(k(m), value.getCas, v(m, "v1-"))
+      value = clients.head.gets(k(m))
+      assertEquals(value.getValue(), v(m, "v1-"))
+      assertTrue(value.getCas() != 0)
+      assertTrue(value.getCas() != old)
+      resp = clients.head.cas(k(m), old, v(m, "v2-"))
+      assertEquals(resp, CASResponse.EXISTS)
+      resp = clients.tail.head.cas(k(m), value.getCas, v(m, "v2-"))
+      assertEquals(resp, CASResponse.OK)
+   }
+
+   def testReplicatedDelete(m: Method) {
+      var f = clients.head.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      f = clients.tail.head.delete(k(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+   }
+
+   def testReplicatedIncrement(m: Method) {
+      var f = clients.head.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.incr(k(m), 1), 2)
+   }
+
+   def testReplicatedDecrement(m: Method): Unit = {
+      val f = clients.head.set(k(m), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(clients.tail.head.decr(k(m), 1), 0)
+   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,196 @@
+package org.infinispan.server.memcached
+
+import test.MemcachedTestingUtil
+import org.testng.annotations.{AfterClass, Test}
+import org.infinispan.manager.CacheManager
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import net.spy.memcached.MemcachedClient
+import java.lang.reflect.Method
+import org.testng.Assert._
+import java.util.concurrent.TimeUnit
+import org.infinispan.Version
+import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.StatsTest")
+class MemcachedStatsTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+   private var jmxDomain = classOf[MemcachedStatsTest].getSimpleName
+   private var client: MemcachedClient = _
+   private var server: MemcachedServer = _
+   private var timeout: Int = 60
+
+   override def createCacheManager: CacheManager = {
+      cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
+      server = startMemcachedTextServer(cacheManager)
+      client = createMemcachedClient(60000, server.getPort)
+      return cacheManager
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      super.destroyAfterClass
+      log.debug("Test finished, close memcached server", null)
+      client.shutdown
+      server.stop
+   }
+
+   def testUnsupportedStats(m: Method) {
+      val stats = getStats
+      assertEquals(stats.get("pid"), "0")
+      assertEquals(stats.get("pointer_size"), "0")
+      assertEquals(stats.get("rusage_user"), "0")
+      assertEquals(stats.get("rusage_system"), "0")
+      assertEquals(stats.get("bytes"), "0")
+      assertEquals(stats.get("connection_structures"), "0")
+      assertEquals(stats.get("auth_cmds"), "0")
+      assertEquals(stats.get("auth_errors"), "0")
+      assertEquals(stats.get("limit_maxbytes"), "0")
+      assertEquals(stats.get("conn_yields"), "0")
+   }
+
+   def testUncomparableStats(m: Method) {
+      TestingUtil.sleepThread(TimeUnit.SECONDS.toMillis(1))
+      val stats = getStats
+      assertNotSame(stats.get("uptime"), "0")
+      assertNotSame(stats.get("time"), "0")
+      assertNotSame(stats.get("uptime"), stats.get("time"))
+   }
+
+   def testStaticStats(m: Method) {
+       val stats = getStats
+       assertEquals(stats.get("version"), Version.version)
+    }
+
+   def testTodoStats: Unit = {
+      val stats = getStats
+      assertEquals(stats.get("curr_connections"), "0")
+      assertEquals(stats.get("total_connections"), "0")
+      assertEquals(stats.get("bytes_read"), "0")
+      assertEquals(stats.get("bytes_written"), "0")
+      assertEquals(stats.get("threads"), "0")
+   }
+
+
+   def testStats(m: Method): Unit = {
+      var stats = getStats
+      assertEquals(stats.get("cmd_set"), "0")
+      assertEquals(stats.get("cmd_get"), "0")
+      assertEquals(stats.get("get_hits"), "0")
+      assertEquals(stats.get("get_misses"), "0")
+      assertEquals(stats.get("delete_hits"), "0")
+      assertEquals(stats.get("delete_misses"), "0")
+      assertEquals(stats.get("curr_items"), "0")
+      assertEquals(stats.get("total_items"), "0")
+      assertEquals(stats.get("incr_misses"), "0")
+      assertEquals(stats.get("incr_hits"), "0")
+      assertEquals(stats.get("decr_misses"), "0")
+      assertEquals(stats.get("decr_hits"), "0")
+      assertEquals(stats.get("cas_misses"), "0")
+      assertEquals(stats.get("cas_hits"), "0")
+      assertEquals(stats.get("cas_badval"), "0")
+
+      var f = client.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+      f = client.set(k(m, "k1-"), 0, v(m, "v1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m, "k1-")), v(m, "v1-"))
+      stats = getStats
+      assertEquals(stats.get("cmd_set"), "2")
+      assertEquals(stats.get("cmd_get"), "2")
+      assertEquals(stats.get("get_hits"), "2")
+      assertEquals(stats.get("get_misses"), "0")
+      assertEquals(stats.get("delete_hits"), "0")
+      assertEquals(stats.get("delete_misses"), "0")
+      assertEquals(stats.get("curr_items"), "2")
+      assertEquals(stats.get("total_items"), "2")
+
+      f = client.delete(k(m, "k1-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      stats = getStats
+      assertEquals(stats.get("curr_items"), "1")
+      assertEquals(stats.get("total_items"), "2")
+      assertEquals(stats.get("delete_hits"), "1")
+      assertEquals(stats.get("delete_misses"), "0")
+
+      assertNull(client.get(k(m, "k99-")))
+      stats = getStats
+      assertEquals(stats.get("get_hits"), "2")
+      assertEquals(stats.get("get_misses"), "1")
+
+      f = client.delete(k(m, "k99-"))
+      assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      stats = getStats
+      assertEquals(stats.get("delete_hits"), "1")
+      assertEquals(stats.get("delete_misses"), "1")
+
+      val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+      f = client.set(k(m, "k3-"), future, v(m, "v3-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      TestingUtil.sleepThread(1100)
+      assertNull(client.get(k(m, "k3-")))
+      stats = getStats
+      assertEquals(stats.get("curr_items"), "1")
+      assertEquals(stats.get("total_items"), "3")
+
+      client.incr(k(m, "k4-"), 1)
+      stats = getStats
+      assertEquals(stats.get("incr_misses"), "1")
+      assertEquals(stats.get("incr_hits"), "0")
+
+      f = client.set(k(m, "k4-"), 0, "1")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      client.incr(k(m, "k4-"), 1)
+      client.incr(k(m, "k4-"), 2)
+      client.incr(k(m, "k4-"), 4)
+      stats = getStats
+      assertEquals(stats.get("incr_misses"), "1")
+      assertEquals(stats.get("incr_hits"), "3")
+
+      client.decr(k(m, "k5-"), 1)
+      stats = getStats
+      assertEquals(stats.get("decr_misses"), "1")
+      assertEquals(stats.get("decr_hits"), "0")
+
+      f = client.set(k(m, "k5-"), 0, "8")
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      client.decr(k(m, "k5-"), 1)
+      client.decr(k(m, "k5-"), 2)
+      client.decr(k(m, "k5-"), 4)
+      stats = getStats
+      assertEquals(stats.get("decr_misses"), "1")
+      assertEquals(stats.get("decr_hits"), "3")
+
+      client.cas(k(m, "k6-"), 1234, v(m, "v6-"))
+      stats = getStats
+      assertEquals(stats.get("cas_misses"), "1")
+      assertEquals(stats.get("cas_hits"), "0")
+      assertEquals(stats.get("cas_badval"), "0")
+
+      f = client.set(k(m, "k6-"), 0, v(m, "v6-"))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      var value = client.gets(k(m, "k6-"))
+      var old: Long = value.getCas
+      client.cas(k(m, "k6-"), value.getCas, v(m, "v66-"))
+      stats = getStats
+      assertEquals(stats.get("cas_misses"), "1")
+      assertEquals(stats.get("cas_hits"), "1")
+      assertEquals(stats.get("cas_badval"), "0")
+      client.cas(k(m, "k6-"), old, v(m, "v66-"))
+      stats = getStats
+      assertEquals(stats.get("cas_misses"), "1")
+      assertEquals(stats.get("cas_hits"), "1")
+      assertEquals(stats.get("cas_badval"), "1")
+   }
+   
+   private def getStats() = {
+      val stats = client.getStats()
+      assertEquals(stats.size(), 1)
+      stats.values.iterator.next
+   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,90 @@
+package org.infinispan.server.memcached.test
+
+import java.lang.reflect.Method
+import net.spy.memcached.{DefaultConnectionFactory, MemcachedClient}
+import java.util.Arrays
+import java.net.InetSocketAddress
+import org.infinispan.Cache
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.transport.Decoder
+import org.infinispan.server.memcached.{MemcachedDecoder, MemcachedValue, MemcachedServer}
+import org.infinispan.server.core.RequestHeader
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+trait MemcachedTestingUtil {
+   def host = "127.0.0.1"
+
+//   def k(m: Method, prefix: String): Array[Byte] = {
+//      val bytes: Array[Byte] = (prefix + m.getName).getBytes
+//      trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+//      bytes
+//   }
+//
+//   def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
+//
+//   def k(m: Method): Array[Byte] = k(m, "k-")
+//
+//   def v(m: Method): Array[Byte] = v(m, "v-")
+
+   def k(m: Method, prefix: String): String = prefix + m.getName
+
+   def v(m: Method, prefix: String): String = prefix + m.getName
+
+   def k(m: Method): String = k(m, "k-")
+
+   def v(m: Method): String = v(m, "v-")
+
+   def createMemcachedClient(timeout: Long, port: Int): MemcachedClient = {
+      var d: DefaultConnectionFactory = new DefaultConnectionFactory {
+         override def getOperationTimeout: Long = timeout
+      }
+      return new MemcachedClient(d, Arrays.asList(new InetSocketAddress(host, port)))
+   }
+
+   def startMemcachedTextServer(cacheManager: CacheManager): MemcachedServer = {
+      startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue)
+   }
+
+   def startMemcachedTextServer(cacheManager: CacheManager, port: Int): MemcachedServer = {
+      val server = new MemcachedServer
+      server.start(host, port, cacheManager, 0, 0)
+      server
+   }
+
+   def startMemcachedTextServer(cacheManager: CacheManager, cacheName: String): MemcachedServer = {
+      startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue, cacheName)
+//      val server = new MemcachedServer {
+//         protected override def getDecoder(cacheManager: CacheManager): Decoder = {
+//            new MemcachedDecoder(cacheManager) {
+//               override def getCache(header: RequestHeader) = cacheManager.getCache[String, MemcachedValue](cacheName)
+//            }
+//         }
+//      }
+//      server.start(host, UniquePortThreadLocal.get.intValue, cacheManager, 0, 0)
+//      server
+   }
+
+   def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
+      val server = new MemcachedServer {
+         protected override def getDecoder(cacheManager: CacheManager): Decoder = {
+            new MemcachedDecoder(cacheManager) {
+               override def createCache = cacheManager.getCache[String, MemcachedValue](cacheName)
+            }
+         }
+      }
+      server.start(host, port, cacheManager, 0, 0)
+      server
+   }
+   
+}
+
+object UniquePortThreadLocal extends ThreadLocal[Int] {
+   private val uniqueAddr = new AtomicInteger(11211)
+   override def initialValue: Int = uniqueAddr.getAndAdd(100)
+}
\ No newline at end of file



More information about the infinispan-commits mailing list