[infinispan-commits] Infinispan SVN: r1613 - in trunk/server: core/src/main and 27 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Mar 23 13:59:59 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-03-23 13:59:55 -0400 (Tue, 23 Mar 2010)
New Revision: 1613
Added:
trunk/server/core/src/main/scala/
trunk/server/core/src/main/scala/org/
trunk/server/core/src/main/scala/org/infinispan/
trunk/server/core/src/main/scala/org/infinispan/server/
trunk/server/core/src/main/scala/org/infinispan/server/core/
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/core/src/test/scala/
trunk/server/core/src/test/scala/org/
trunk/server/core/src/test/scala/org/infinispan/
trunk/server/core/src/test/scala/org/infinispan/server/
trunk/server/core/src/test/scala/org/infinispan/server/core/
trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
trunk/server/memcached/src/main/scala/
trunk/server/memcached/src/main/scala/org/
trunk/server/memcached/src/main/scala/org/infinispan/
trunk/server/memcached/src/main/scala/org/infinispan/server/
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
trunk/server/memcached/src/test/scala/
trunk/server/memcached/src/test/scala/org/
trunk/server/memcached/src/test/scala/org/infinispan/
trunk/server/memcached/src/test/scala/org/infinispan/server/
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Removed:
trunk/server/core/src/main/java/
trunk/server/core/src/test/java/
trunk/server/memcached/src/main/java/
trunk/server/memcached/src/test/java/
Modified:
trunk/server/core/pom.xml
trunk/server/memcached/pom.xml
Log:
Refactored memcached and core module so that core does more work and memcached only does protocol specific work. Also converted both modules to scala.
Modified: trunk/server/core/pom.xml
===================================================================
--- trunk/server/core/pom.xml 2010-03-22 20:12:52 UTC (rev 1612)
+++ trunk/server/core/pom.xml 2010-03-23 17:59:55 UTC (rev 1613)
@@ -18,6 +18,7 @@
<version.netty>3.2.0.BETA1</version.netty>
<version.gnu.getopt>1.0.13</version.gnu.getopt>
<version.apache.log4j>1.2.15</version.apache.log4j>
+ <version.scala>2.8.0.Beta1</version.scala>
</properties>
<dependencies>
@@ -39,6 +40,49 @@
<version>${version.apache.log4j}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${version.scala}</version>
+ </dependency>
+
</dependencies>
+ <build>
+ <finalName>infinispan</finalName>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ </build>
+
</project>
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,179 @@
+package org.infinispan.server.core
+
+import org.infinispan.Cache
+import Operation._
+import transport.{ExceptionEvent, Decoder, ChannelHandlerContext, ChannelBuffer}
+import scala.collection.mutable.HashMap
+import scala.collection.immutable
+import org.infinispan.remoting.transport.Address
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.JavaConversions._
+import java.util.concurrent.TimeUnit
+import org.infinispan.stats.Stats
+import org.infinispan.server.core.VersionGenerator._
+import java.io.StreamCorruptedException
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class AbstractProtocolDecoder[K, V <: Value] extends Decoder {
+ import AbstractProtocolDecoder._
+
+ type SuitableParameters <: RequestParameters
+
+ private val versionCounter = new AtomicInteger
+
+ override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef = {
+ if (buffer.readableBytes < 1) return null
+ val header = readHeader(buffer)
+ val cache = getCache(header)
+ header.op match {
+ case PutRequest | PutIfAbsentRequest | ReplaceRequest | ReplaceIfUnmodifiedRequest | DeleteRequest => {
+ val k = readKey(buffer)
+ val params = readParameters(header.op, buffer)
+ header.op match {
+ case PutRequest => {
+ putInCache(k, params, cache)
+ sendResponse(header, ctx, None, None, Some(params), None)
+ }
+ case PutIfAbsentRequest => {
+ val prev = cache.get(k)
+ if (prev == null) putIfAbsentInCache(k, params, cache) // Generate new version only if key not present
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ }
+ case ReplaceRequest => {
+ val prev = cache.get(k)
+ if (prev != null) replaceInCache(k, params, cache) // Generate new version only if key present
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ }
+ case ReplaceIfUnmodifiedRequest => {
+ val prev = cache.get(k)
+ if (prev != null) {
+ if (prev.version == params.version) {
+ // Generate new version only if key present and version has not changed, otherwise it's wasteful
+ val v = createValue(params, generateVersion(cache))
+ val replaced = cache.replace(k, prev, v);
+ if (replaced)
+ sendResponse(header, ctx, None, Some(v), Some(params), Some(prev))
+ else
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ } else {
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ }
+ } else {
+ sendResponse(header, ctx, None, None, Some(params), None)
+ }
+ }
+ case DeleteRequest => {
+ val prev = cache.remove(k)
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ }
+ }
+ }
+ case GetRequest | GetWithVersionRequest => {
+ val keys = readKeys(buffer)
+ if (keys.length > 1) {
+ val map = new HashMap[K,V]()
+ for (k <- keys) {
+ val v = cache.get(k)
+ if (v != null)
+ map += (k -> v)
+ }
+ sendResponse(header, ctx, new immutable.HashMap ++ map)
+ } else {
+ sendResponse(header, ctx, Some(keys.head), Some(cache.get(keys.head)), None, None)
+ }
+ }
+ case StatsRequest => sendResponse(ctx, cache.getAdvancedCache.getStats)
+ case _ => handleCustomRequest(header, ctx, buffer, cache)
+ }
+ null
+ }
+
+ private def putInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+ val v = createValue(params, generateVersion(cache))
+ cache.put(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+ }
+
+ private def putIfAbsentInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+ val v = createValue(params, generateVersion(cache))
+ cache.putIfAbsent(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+ }
+
+ private def replaceInCache(k: K, params: SuitableParameters, cache: Cache[K, V]): V = {
+ val v = createValue(params, generateVersion(cache))
+ cache.replace(k, v, toMillis(params.lifespan), DefaultTimeUnit, toMillis(params.maxIdle), DefaultTimeUnit)
+ }
+
+ override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
+ error("Exception reported", e.getCause)
+ sendResponse(ctx, e.getCause)
+ }
+
+ def readHeader(buffer: ChannelBuffer): RequestHeader
+
+ def getCache(header: RequestHeader): Cache[K, V]
+
+ // todo: probably remove in favour of readKeys
+ def readKey(buffer: ChannelBuffer): K
+
+ def readKeys(buffer: ChannelBuffer): Array[K]
+
+ def readParameters(op: Enumeration#Value, buffer: ChannelBuffer): SuitableParameters
+
+ def createValue(params: SuitableParameters, nextVersion: Long): V
+
+ def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, k: Option[K], v: Option[V], params: Option[SuitableParameters], prev: Option[V])
+
+ def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, pairs: Map[K, V])
+
+ def sendResponse(ctx: ChannelHandlerContext, t: Throwable)
+
+ def sendResponse(ctx: ChannelHandlerContext, stats: Stats)
+
+ def handleCustomRequest(header: RequestHeader, ctx: ChannelHandlerContext, buffer: ChannelBuffer, cache: Cache[K, V])
+
+ /**
+ * Transforms lifespan pass as seconds into milliseconds
+ * following this rule:
+ *
+ * If lifespan is bigger than number of seconds in 30 days,
+ * then it is considered unix time. After converting it to
+ * milliseconds, we substract the current time in and the
+ * result is returned.
+ *
+ * Otherwise it's just considered number of seconds from
+ * now and it's returned in milliseconds unit.
+ */
+ private def toMillis(lifespan: Int): Long = {
+ if (lifespan > SecondsInAMonth) TimeUnit.SECONDS.toMillis(lifespan) - System.currentTimeMillis
+ else TimeUnit.SECONDS.toMillis(lifespan)
+ }
+
+ protected def generateVersion(cache: Cache[K, V]): Long = {
+ val rpcManager = cache.getAdvancedCache.getRpcManager
+ if (rpcManager != null) {
+ val transport = rpcManager.getTransport
+ newVersion(Some(transport.getAddress), Some(transport.getMembers), transport.getViewId)
+ } else {
+ newVersion(None, None, 0)
+ }
+ }
+
+}
+
+object AbstractProtocolDecoder extends Logging {
+ private val SecondsInAMonth = 60 * 60 * 24 * 30
+ private val DefaultTimeUnit = TimeUnit.MILLISECONDS
+}
+
+// todo: once I implement the hotrod see, revisit to see whether this class is still necessary,
+// todo: ...I suspect so cos I'd need a place to hold stuff that appears before the operation
+class RequestHeader(val op: Enumeration#Value)
+
+class RequestParameters(val data: Array[Byte], val lifespan: Int, val maxIdle: Int, val version: Long)
+
+class UnknownOperationException(reason: String) extends StreamCorruptedException(reason)
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,55 @@
+package org.infinispan.server.core
+
+import java.net.InetSocketAddress
+import transport.netty.{EncoderAdapter, DecoderAdapter, NettyTransport}
+import transport.{Decoder, Encoder, Transport}
+import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class AbstractProtocolServer extends ProtocolServer {
+ private var host: String = _
+ private var port: Int = _
+ private var masterThreads: Int = _
+ private var workerThreads: Int = _
+ private var transport: Transport = _
+ private var cacheManager: CacheManager = _
+ private var decoder: Decoder = _
+ private var encoder: Encoder = _
+
+ override def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int) {
+ this.host = host
+ this.port = port
+ this.masterThreads = masterThreads
+ this.workerThreads = workerThreads
+
+ decoder = getDecoder(cacheManager)
+ decoder.start
+ encoder = getEncoder
+ val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
+ val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
+ val address = new InetSocketAddress(host, port)
+ // TODO change cache name 'default' to something more meaningful and dependent of protocol
+ transport = new NettyTransport(nettyDecoder, nettyEncoder, address, masterThreads, workerThreads, "default")
+ transport.start
+ }
+
+ override def stop {
+ if (transport != null)
+ transport.stop
+ if (decoder != null)
+ decoder.stop
+// cacheManager.stop
+ }
+
+ def getPort = port
+
+ protected def getEncoder: Encoder
+
+ protected def getDecoder(cacheManager: CacheManager): Decoder
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Logging.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,43 @@
+package org.infinispan.server.core
+
+import org.infinispan.util.logging.{LogFactory, Log}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since 4.1
+ */
+trait Logging {
+ private lazy val log: Log = LogFactory.getLog(getClass)
+
+// def info(msg: => String) = if (log.isInfoEnabled) log.info(msg, null)
+
+ // params.map(_.asInstanceOf[AnyRef]) => returns a Seq[AnyRef]
+ // the ': _*' part tells the compiler to pass it as varargs
+ def info(msg: => String, params: Any*) = if (log.isInfoEnabled) log.info(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+// def debug(msg: => String) = log.debug(msg, null)
+
+ def debug(msg: => String, params: Any*) = if (log.isDebugEnabled) log.debug(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+// def trace(msg: => String) = log.trace(msg, null)
+
+ def trace(msg: => String, params: Any*) = if (log.isTraceEnabled) log.trace(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+ def warn(msg: => String, params: Any*) = log.warn(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+ def warn(msg: => String, t: Throwable) = log.warn(msg, t, null)
+
+ def error(msg: => String) = log.error(msg, null)
+
+ def error(msg: => String, t: Throwable) = log.error(msg, t, null)
+
+ // TODO: Sort out the other error methods that take both Throwable and varargs
+
+// def error(msg: => String, params: Any*) =
+// if (log.isErrorEnabled) log.error(msg, params.map(_.asInstanceOf[AnyRef]) : _*)
+//
+// def error(msg: => String, t: Throwable, params: Any*) =
+// if (log.isErrorEnabled) log.error(msg, t, params.map(_.asInstanceOf[AnyRef]) : _*)
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Main.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,214 @@
+package org.infinispan.server.core
+
+import scala.collection.JavaConversions._
+import collection.mutable.HashMap
+import scala.collection.mutable
+import org.infinispan.util.Util
+import java.io.IOException
+import java.security.{PrivilegedAction, AccessController}
+import java.util.concurrent.{ThreadFactory, ExecutionException, Callable, Executors}
+import gnu.getopt.{Getopt, LongOpt}
+import org.infinispan.Version
+import org.infinispan.manager.{CacheManager, DefaultCacheManager}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object Main extends Logging {
+
+ val PROP_KEY_PORT = "infinispan.server.port"
+ val PROP_KEY_HOST = "infinispan.server.host"
+ val PROP_KEY_MASTER_THREADS = "infinispan.server.master.threads"
+ val PROP_KEY_WORKER_THREADS = "infinispan.server.worker.threads"
+ val PROP_KEY_CACHE_CONFIG = "infinispan.server.cache.config"
+ val PROP_KEY_PROTOCOL = "infinispan.server.protocol"
+ val PORT_DEFAULT = 11211
+ val HOST_DEFAULT = "127.0.0.1"
+ val MASTER_THREADS_DEFAULT = 0
+ val WORKER_THREADS_DEFAULT = 0
+
+ /**
+ * Server properties. This object holds all of the required
+ * information to get the server up and running. Use System
+ * properties for defaults.
+ */
+ private val props: mutable.Map[String, String] = {
+ // Set default properties
+ val properties = new HashMap[String, String]()
+ val sysProps = System.getProperties
+ for (property <- asIterator(sysProps.iterator))
+ properties.put(property._1, property._2)
+ properties
+ }
+
+ private var programName: String = _
+ private var server: ProtocolServer = _
+
+ def main(args: Array[String]) {
+ info("Start main with args: {0}", args.mkString(", "))
+ var worker = new Callable[Void] {
+ override def call = {
+ try {
+ boot(args)
+ }
+ catch {
+ case e: Exception => {
+ System.err.println("Failed to boot JBoss:")
+ e.printStackTrace
+ throw e
+ }
+ }
+ null
+ }
+ }
+ var f = Executors.newSingleThreadScheduledExecutor(new ThreadFactory {
+ override def newThread(r: Runnable): Thread = {
+ // TODO Maybe create thread names based on the protocol run
+ return new Thread(r, "InfinispanServer-Main")
+ }
+ }).submit(worker)
+ f.get
+ }
+
+ def boot(args: Array[String]) {
+ // First process the command line to pickup custom props/settings
+ processCommandLine(args)
+
+ val host = if (props.get(PROP_KEY_HOST) == None) HOST_DEFAULT else props.get(PROP_KEY_HOST).get
+ val masterThreads = if (props.get(PROP_KEY_MASTER_THREADS) == None) MASTER_THREADS_DEFAULT else props.get(PROP_KEY_MASTER_THREADS).get.toInt
+ if (masterThreads < 0) {
+ throw new IllegalArgumentException("Master threads can't be lower than 0: " + masterThreads)
+ }
+ val workerThreads = if (props.get(PROP_KEY_WORKER_THREADS) == None) WORKER_THREADS_DEFAULT else props.get(PROP_KEY_WORKER_THREADS).get.toInt
+ if (workerThreads < 0) {
+ throw new IllegalArgumentException("Worker threads can't be lower than 0: " + masterThreads)
+ }
+ val configFile = props.get(PROP_KEY_CACHE_CONFIG)
+ var protocol = props.get(PROP_KEY_PROTOCOL)
+ if (protocol == None) {
+ System.err.println("ERROR: Please indicate protocol to run with -r parameter")
+ showAndExit
+ }
+
+ // TODO: move class name and protocol number to a resource file under the corresponding project
+ val clazz = protocol.get match {
+ case "memcached" => "org.infinispan.server.memcached.MemcachedServer"
+ case "hotrod" => "org.infinispan.server.hotrod.HotRodServer"
+ }
+ val port = {
+ if (props.get(PROP_KEY_PORT) == None) {
+ protocol.get match {
+ case "memcached" => 11211
+ case "hotrod" => 11311
+ }
+ } else {
+ props.get(PROP_KEY_PORT).get.toInt
+ }
+ }
+
+ var server = Util.getInstance(clazz).asInstanceOf[ProtocolServer]
+ val cacheManager = if (configFile == None) new DefaultCacheManager else new DefaultCacheManager(configFile.get)
+ addShutdownHook(new ShutdownHook(server, cacheManager))
+ server.start(host, port, cacheManager, masterThreads, workerThreads)
+ }
+
+ private def processCommandLine(args: Array[String]) {
+ programName = System.getProperty("program.name", "startServer")
+ var sopts = "-:hD:Vp:l:m:t:c:r:"
+ var lopts = Array(
+ new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h'),
+ new LongOpt("version", LongOpt.NO_ARGUMENT, null, 'V'),
+ new LongOpt("port", LongOpt.REQUIRED_ARGUMENT, null, 'p'),
+ new LongOpt("host", LongOpt.REQUIRED_ARGUMENT, null, 'l'),
+ new LongOpt("master_threads", LongOpt.REQUIRED_ARGUMENT, null, 'm'),
+ new LongOpt("worker_threads", LongOpt.REQUIRED_ARGUMENT, null, 't'),
+ new LongOpt("cache_config", LongOpt.REQUIRED_ARGUMENT, null, 'c'),
+ new LongOpt("protocol", LongOpt.REQUIRED_ARGUMENT, null, 'r'))
+ var getopt = new Getopt(programName, args, sopts, lopts)
+ var code: Int = 0
+ while ((({code = getopt.getopt; code})) != -1) {
+ code match {
+ case ':' | '?' => System.exit(1)
+ case 1 => System.err.println(programName + ": unused non-option argument: " + getopt.getOptarg)
+ case 'h' => showAndExit
+ case 'V' => {
+ Version.printFullVersionInformation
+ System.exit(0)
+ }
+ case 'p' => props.put(PROP_KEY_PORT, getopt.getOptarg)
+ case 'l' => props.put(PROP_KEY_HOST, getopt.getOptarg)
+ case 'm' => props.put(PROP_KEY_MASTER_THREADS, getopt.getOptarg)
+ case 't' => props.put(PROP_KEY_WORKER_THREADS, getopt.getOptarg)
+ case 'c' => props.put(PROP_KEY_CACHE_CONFIG, getopt.getOptarg)
+ case 'r' => props.put(PROP_KEY_PROTOCOL, getopt.getOptarg)
+ case 'D' => {
+ val arg = getopt.getOptarg
+ var name = ""
+ var value = ""
+ var i = arg.indexOf("=")
+ if (i == -1) {
+ name = arg
+ value = "true"
+ } else {
+ name = arg.substring(0, i)
+ value = arg.substring(i + 1, arg.length)
+ }
+ System.setProperty(name, value)
+ }
+ case _ => throw new Exception("unhandled option code: " + code)
+ }
+ }
+ }
+
+ private def addShutdownHook(shutdownHook: Thread): Unit = {
+ AccessController.doPrivileged(new PrivilegedAction[Void] {
+ override def run = {
+ Runtime.getRuntime.addShutdownHook(shutdownHook)
+ null
+ }
+ })
+ }
+
+ private def showAndExit {
+ println("usage: " + programName + " [options]")
+ println
+ println("options:")
+ println(" -h, --help Show this help message")
+ println(" -V, --version Show version information")
+ println(" -- Stop processing options")
+ println(" -p, --port=<num> TCP port number to listen on (default: 11211)")
+ println(" -l, --host=<host or ip> Interface to listen on (default: 127.0.0.1, localhost)")
+ println(" -m, --master_threads=<num> Number of threads accepting incoming connections (default: unlimited while resources are available)")
+ println(" -t, --work_threads=<num> Number of threads processing incoming requests and sending responses (default: unlimited while resources are available)")
+ println(" -c, --cache_config=<filename> Cache configuration file (default: creates cache with default values)")
+ println(" -r, --protocol=[memcached|hotrod] Protocol to understand by the server. This is a mandatory option and you should choose one of the two options")
+ println(" -D<name>[=<value>] Set a system property")
+ println
+ System.exit(0)
+ }
+}
+
+private class ShutdownHook(server: ProtocolServer, cacheManager: CacheManager) extends Thread {
+ override def run {
+ if (server != null) {
+ System.out.println("Posting Shutdown Request to the server...")
+ var f = Executors.newSingleThreadExecutor.submit(new Callable[Void] {
+ override def call = {
+ server.stop
+ cacheManager.stop
+ null
+ }
+ })
+ try {
+ f.get
+ }
+ catch {
+ case ie: IOException => Thread.interrupted
+ case e: ExecutionException => throw new RuntimeException("Exception encountered in shutting down the server", e)
+ }
+ }
+ }
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Operation.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object Operation extends Enumeration {
+ type Operation = Value
+ val PutRequest, PutIfAbsentRequest, ReplaceRequest, ReplaceIfUnmodifiedRequest = Value
+ val GetRequest, GetWithVersionRequest = Value
+ val DeleteRequest, StatsRequest = Value
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core
+
+import org.infinispan.manager.CacheManager
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+trait ProtocolServer {
+ def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int)
+ def stop
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/Value.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,32 @@
+package org.infinispan.server.core
+
+import org.infinispan.util.Util
+import java.io.{Serializable, ObjectOutput, ObjectInput, Externalizable}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// TODO: Make it a hardcoded Externalizer
+class Value(val v: Array[Byte], val version: Long) extends Serializable {
+
+ override def toString = {
+ new StringBuilder().append("Value").append("{")
+ .append("v=").append(Util.printArray(v, false))
+ .append(", version=").append(version)
+ .append("}").toString
+ }
+
+// override def readExternal(in: ObjectInput) {
+// v = new Array[Byte](in.read())
+// in.read(v)
+// version = in.readLong
+// }
+//
+// override def writeExternal(out: ObjectOutput) {
+// out.write(v.length)
+// out.write(v)
+// out.writeLong(version)
+// }
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/VersionGenerator.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,34 @@
+package org.infinispan.server.core
+
+import org.infinispan.remoting.transport.Address
+import org.infinispan.Cache
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object VersionGenerator {
+
+ private val versionCounter = new AtomicInteger
+
+ def newVersion(address: Option[Address], members: Option[Iterable[Address]], viewId: Long): Long = {
+ val counter = versionCounter.incrementAndGet
+ var version: Long = counter
+ if (address != None && members != None) {
+ // todo: perf tip: cache rank and viewId as a volatile and recalculate with each view change
+ val rank: Long = findAddressRank(address.get, members.get, 1)
+ version = (rank << 32) | version
+ version = (viewId << 48) | version
+ }
+ version
+ }
+
+ private def findAddressRank(address: Address, members: Iterable[Address], rank: Int): Int = {
+ if (address.equals(members.head)) rank
+ else findAddressRank(address, members.tail, rank + 1)
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Channel.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,12 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Channel {
+ def write(message: Any): ChannelFuture
+ def disconnect: ChannelFuture
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffer.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,42 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelBuffer {
+ def readByte: Byte
+ def readBytes(dst: Array[Byte], dstIndex: Int, length: Int)
+ def readUnsignedByte: Float
+ def readUnsignedInt: Int
+ def readUnsignedLong: Long
+ def readBytes(length: Int): ChannelBuffer
+ def readerIndex: Int
+ def readBytes(dst: Array[Byte]): Unit
+ def readRangedBytes: Array[Byte]
+ def readableBytes: Int
+
+ /**
+ * Reads length of String and then returns an UTF-8 formatted String of such length.
+ */
+ def readString: String
+ def writeByte(value: Byte)
+ def writeBytes(src: Array[Byte])
+
+ /**
+ * Writes the length of the byte array and transfers the specified source array's data to this buffer
+ */
+ def writeRangedBytes(src: Array[Byte])
+ def writeUnsignedInt(i: Int)
+ def writeUnsignedLong(l: Long)
+ def writerIndex: Int
+
+ /**
+ * Writes the length of the String followed by the String itself. This methods expects String not to be null.
+ */
+ def writeString(msg: String)
+
+ def getUnderlyingChannelBuffer: AnyRef
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelBuffers.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,14 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelBuffers {
+// def wrappedBuffer(buffers: ChannelBuffer*): ChannelBuffer
+// def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer
+ def wrappedBuffer(array: Array[Byte]*): ChannelBuffer
+ def dynamicBuffer(): ChannelBuffer
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelFuture.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,23 @@
+package org.infinispan.server.core.transport
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelFuture {
+ def getChannel: Channel
+ def isDone: Boolean
+ def isCancelled: Boolean
+ def setSuccess: Boolean
+ def setFailure(cause: Throwable): Boolean
+ def await: ChannelFuture
+ def awaitUninterruptibly: ChannelFuture
+ def await(timeout: Long, unit: TimeUnit): Boolean
+ def await(timeoutMillis: Long): Boolean
+ def awaitUninterruptibly(timeout: Long, unit: TimeUnit): Boolean
+ def awaitUninterruptibly(timeoutMillis: Long): Boolean
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ChannelHandlerContext.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,12 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ChannelHandlerContext {
+ def getChannel: Channel
+ def getChannelBuffers: ChannelBuffers
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,19 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Decoder {
+
+ def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef
+
+ def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
+
+ def start
+
+ def stop
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Encoder.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,11 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Encoder {
+ def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/ExceptionEvent.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,11 @@
+package org.infinispan.server.core.transport
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class ExceptionEvent {
+ def getCause: Throwable
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/NoState.java 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,32 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.server.core.transport;
+
+/**
+ * // TODO: Document this
+ *
+ * @author Galder Zamarreño
+ */
+public enum NoState {
+}
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Transport.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,17 @@
+package org.infinispan.server.core.transport
+
+import java.net.SocketAddress
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+abstract class Transport {
+
+ def start
+
+ def stop
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,24 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{Channel => NettyChannel}
+import org.infinispan.server.core.transport.{ChannelBuffer, ChannelFuture, Channel}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelAdapter(val ch: NettyChannel) extends Channel {
+
+ override def disconnect: ChannelFuture = new ChannelFutureAdapter(ch.disconnect());
+
+ override def write(message: Any): ChannelFuture = {
+ val toWrite = message match {
+ case buffer: ChannelBuffer => buffer.getUnderlyingChannelBuffer
+ case _ => message
+ }
+ new ChannelFutureAdapter(ch.write(toWrite));
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,60 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.ChannelBuffer
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelBufferAdapter(val buffer: NettyChannelBuffer) extends ChannelBuffer {
+
+ override def readByte: Byte = buffer.readByte
+ override def readBytes(dst: Array[Byte], dstIndex: Int, length: Int) = buffer.readBytes(dst, dstIndex, length)
+ override def readUnsignedByte: Float = buffer.readUnsignedByte
+ override def readUnsignedInt: Int = { // TODO
+ 0
+ }
+ override def readUnsignedLong: Long = { // TODO
+ 0
+ }
+ override def readBytes(length: Int): ChannelBuffer = new ChannelBufferAdapter(buffer.readBytes(length))
+ override def readerIndex: Int = readerIndex
+ override def readBytes(dst: Array[Byte]) = buffer.readBytes(dst)
+ override def readRangedBytes: Array[Byte] = { // TODO
+ null
+ }
+ override def readableBytes = buffer.writerIndex - buffer.readerIndex
+
+ /**
+ * Reads length of String and then returns an UTF-8 formatted String of such length.
+ */
+ override def readString: String = { // TODO
+ null
+ }
+ override def writeByte(value: Byte) = buffer.writeByte(value)
+ override def writeBytes(src: Array[Byte]) = buffer.writeBytes(src)
+
+ /**
+ * Writes the length of the byte array and transfers the specified source array's data to this buffer
+ */
+ override def writeRangedBytes(src: Array[Byte]) { // TODO
+ 0
+ }
+ override def writeUnsignedInt(i: Int) { // TODO
+ }
+ override def writeUnsignedLong(l: Long) { // TODO
+ }
+ override def writerIndex: Int = buffer.writerIndex
+
+ /**
+ * Writes the length of the String followed by the String itself. This methods expects String not to be null.
+ */
+ override def writeString(msg: String) { // TODO
+ }
+
+ override def getUnderlyingChannelBuffer: AnyRef = buffer
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBuffersAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,37 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.{ChannelBuffer, ChannelBuffers}
+import org.jboss.netty.buffer.{ChannelBuffers => NettyChannelBuffers}
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object ChannelBuffersAdapter extends ChannelBuffers {
+
+// override def wrappedBuffer(buffer: ChannelBuffer): ChannelBuffer = {
+//// val nettyBuffers = new Array[NettyChannelBuffer](buffers.length)
+//// val nettyBuffers = buffers.map(_.getUnderlyingChannelBuffer) : _*
+//// for (buffer <- buffers) {
+//// nettyBuffers + buffer.getUnderlyingChannelBuffer
+//// }
+// val nettyBuffer = buffer.getUnderlyingChannelBuffer.asInstanceOf[NettyChannelBuffer]
+// new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(nettyBuffer))
+// }
+
+// override def wrappedBuffer(array: Array[Byte]): ChannelBuffer = {
+// new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array));
+// }
+
+ override def wrappedBuffer(array: Array[Byte]*): ChannelBuffer = {
+ new ChannelBufferAdapter(NettyChannelBuffers.wrappedBuffer(array : _*));
+ }
+
+ override def dynamicBuffer(): ChannelBuffer = {
+ new ChannelBufferAdapter(NettyChannelBuffers.dynamicBuffer());
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelFutureAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,43 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{ChannelFuture => NettyChannelFuture}
+import org.infinispan.server.core.transport.{Channel, ChannelFuture}
+import java.util.concurrent.TimeUnit
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelFutureAdapter(val future: NettyChannelFuture) extends ChannelFuture {
+
+ override def getChannel: Channel = new ChannelAdapter(future.getChannel())
+
+ override def isDone: Boolean = future.isDone
+
+ override def isCancelled: Boolean = future.isCancelled
+
+ override def setSuccess: Boolean = future.setSuccess
+
+ override def setFailure(cause: Throwable): Boolean = future.setFailure(cause)
+
+ override def await: ChannelFuture = {
+ future.await
+ this
+ }
+
+ override def awaitUninterruptibly: ChannelFuture = {
+ future.awaitUninterruptibly
+ this
+ }
+
+ override def await(timeout: Long, unit: TimeUnit): Boolean = future.await(timeout, unit)
+
+ override def await(timeoutMillis: Long): Boolean = future.await(timeoutMillis)
+
+ override def awaitUninterruptibly(timeout: Long, unit: TimeUnit): Boolean = future.awaitUninterruptibly(timeout, unit)
+
+ override def awaitUninterruptibly(timeoutMillis: Long): Boolean = future.awaitUninterruptibly(timeoutMillis)
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelHandlerContextAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,18 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.{ChannelHandlerContext => NettyChannelHandlerContext}
+import org.infinispan.server.core.transport.{ChannelBuffers, Channel, ChannelHandlerContext}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ChannelHandlerContextAdapter(val ctx: NettyChannelHandlerContext) extends ChannelHandlerContext {
+
+ override def getChannel: Channel = new ChannelAdapter(ctx.getChannel)
+
+ override def getChannelBuffers: ChannelBuffers = ChannelBuffersAdapter
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,27 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder
+import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
+import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
+import org.infinispan.server.core.transport._;
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class DecoderAdapter(decoder: Decoder) extends ReplayingDecoder[NoState](true) {
+
+ override def decode(nCtx: NettyChannelHandlerContext, channel: NettyChannel,
+ nBuffer: NettyChannelBuffer, passedState: NoState): AnyRef = {
+ val ctx = new ChannelHandlerContextAdapter(nCtx);
+ val buffer = new ChannelBufferAdapter(nBuffer);
+ decoder.decode(ctx, buffer);
+ }
+
+ override def exceptionCaught(ctx: NettyChannelHandlerContext, e: NettyExceptionEvent) {
+ decoder.exceptionCaught(new ChannelHandlerContextAdapter(ctx), new ExceptionEventAdapter(e));
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/EncoderAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,27 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel.ChannelHandler
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder
+import org.jboss.netty.channel.{ChannelHandlerContext => NettyChannelHandlerContext}
+import org.jboss.netty.channel.{Channel => NettyChannel}
+import org.infinispan.server.core.transport.{ChannelBuffer, Encoder}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at ChannelHandler.Sharable
+class EncoderAdapter(encoder: Encoder) extends OneToOneEncoder {
+
+ protected override def encode(nCtx: NettyChannelHandlerContext, ch: NettyChannel, msg: AnyRef): AnyRef = {
+ var ret = encoder.encode(new ChannelHandlerContextAdapter(nCtx), new ChannelAdapter(ch), msg);
+ ret = ret match {
+ case cb: ChannelBuffer => cb.getUnderlyingChannelBuffer
+ case _ => ret
+ }
+ ret
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ExceptionEventAdapter.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,16 @@
+package org.infinispan.server.core.transport.netty
+
+import org.infinispan.server.core.transport.ExceptionEvent
+import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class ExceptionEventAdapter(event: NettyExceptionEvent) extends ExceptionEvent {
+
+ override def getCause: Throwable = return event.getCause
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,25 @@
+package org.infinispan.server.core.transport.netty
+
+import org.jboss.netty.channel._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class NettyChannelPipelineFactory(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler)
+ extends ChannelPipelineFactory {
+
+ override def getPipeline: ChannelPipeline = {
+ val pipeline = Channels.pipeline
+ if (decoder != null) {
+ pipeline.addLast("decoder", decoder);
+ }
+ if (encoder != null) {
+ pipeline.addLast("encoder", encoder);
+ }
+ return pipeline;
+ }
+
+}
\ No newline at end of file
Added: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala (rev 0)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,127 @@
+package org.infinispan.server.core.transport.netty
+
+import java.net.SocketAddress
+import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import java.util.concurrent.atomic.AtomicInteger
+import org.jboss.netty.channel.{ChannelUpstreamHandler, ChannelDownstreamHandler, ChannelFactory, ChannelPipelineFactory}
+import org.jboss.netty.bootstrap.ServerBootstrap
+import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
+import org.infinispan.server.core.transport.Transport
+import org.infinispan.server.core.Logging
+import scala.collection.JavaConversions._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class NettyTransport(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler,
+ address: SocketAddress, masterThreads: Int, workerThreads: Int, cacheName: String) extends Transport {
+ import NettyTransport._
+
+ val serverChannels = new DefaultChannelGroup(cacheName + "-channels")
+ val acceptedChannels = new DefaultChannelGroup(cacheName + "-accepted")
+ val pipeline = new NettyChannelPipelineFactory(decoder, encoder)
+ val factory = {
+ if (workerThreads == 0)
+ new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
+ else
+ new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads)
+ }
+
+ lazy val masterExecutor = {
+ val tf = new NamedThreadFactory(cacheName + '-' + "Master")
+ if (masterThreads == 0) {
+ debug("Configured unlimited threads for master thread pool")
+ Executors.newCachedThreadPool(tf)
+ } else {
+ debug("Configured {0} threads for master thread pool", masterThreads)
+ Executors.newFixedThreadPool(masterThreads, tf)
+ }
+ }
+ lazy val workerExecutor = {
+ val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
+ if (workerThreads == 0) {
+ debug("Configured unlimited threads for worker thread pool")
+ Executors.newCachedThreadPool(tf)
+ }
+ else {
+ debug("Configured {0} threads for worker thread pool", workerThreads)
+ Executors.newFixedThreadPool(workerThreads, tf)
+ }
+ }
+
+ override def start {
+ val bootstrap = new ServerBootstrap(factory);
+ bootstrap.setPipelineFactory(pipeline);
+ val ch = bootstrap.bind(address);
+ serverChannels.add(ch);
+ }
+
+ override def stop {
+ // We *pause* the acceptor so no new connections are made
+ var future = serverChannels.unbind().awaitUninterruptibly();
+ if (!future.isCompleteSuccess()) {
+ warn("Server channel group did not completely unbind");
+// val iter = future.getGroup().iterator
+// while (iter.hasNext) {
+// val ch = iter.next
+// if (ch.isBound()) {
+// warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+// }
+// }
+
+ for (ch <- asIterator(future.getGroup().iterator)) {
+ if (ch.isBound()) {
+ warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+ }
+ }
+ }
+
+ // TODO remove workaround when inteating Netty 3.2.x - https://jira.jboss.org/jira/browse/NETTY-256
+ masterExecutor.shutdown();
+ try {
+ masterExecutor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch {
+ case ie: InterruptedException => ie.printStackTrace();
+ }
+
+ workerExecutor.shutdown();
+ serverChannels.close().awaitUninterruptibly();
+ future = acceptedChannels.close().awaitUninterruptibly();
+ if (!future.isCompleteSuccess()) {
+ warn("Channel group did not completely close");
+// val iter = future.getGroup().iterator
+// while (iter.hasNext) {
+// val ch = iter.next
+// if (ch.isBound()) {
+// warn(ch + " is still connected to " + ch.getRemoteAddress());
+// }
+// }
+
+ for (ch <- asIterator(future.getGroup().iterator)) {
+ if (ch.isBound()) {
+ warn(ch + " is still connected to " + ch.getRemoteAddress());
+ }
+ }
+ }
+// debug("Channel group completely closed");
+ debug("Channel group completely closed, release external resources");
+ factory.releaseExternalResources();
+ }
+
+}
+
+object NettyTransport extends Logging
+
+private class NamedThreadFactory(val name: String) extends ThreadFactory {
+ val threadCounter = new AtomicInteger
+
+ override def newThread(r: Runnable): Thread = {
+ var t = new Thread(r, System.getProperty("program.name") + "-" + name + '-' + threadCounter.incrementAndGet)
+ t.setDaemon(true)
+ t
+ }
+}
Added: trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala
===================================================================
--- trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala (rev 0)
+++ trunk/server/core/src/test/scala/org/infinispan/server/core/VersionGeneratorTest.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,35 @@
+package org.infinispan.server.core
+
+import org.testng.annotations.Test
+import org.infinispan.remoting.transport.Address
+import org.testng.Assert._
+import org.infinispan.server.core.VersionGenerator._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at Test(groups = Array("functional"), testName = "server.core.VersionGeneratorTest")
+class VersionGeneratorTest {
+
+ def testGenerateVersion {
+ val addr1 = new TestAddress(1)
+ val addr2 = new TestAddress(2)
+ val addr3 = new TestAddress(1)
+ val members = List(addr1, addr2, addr3)
+ assertEquals(newVersion(Some(addr2), Some(members), 1), 0x1000200000001L)
+ }
+}
+
+class TestAddress(val addressNum: Int) extends Address {
+ override def equals(o: Any): Boolean = {
+ o match {
+ case ta: TestAddress => ta.addressNum == addressNum
+ case _ => false
+ }
+ }
+ override def hashCode = addressNum
+ override def toString = "TestAddress#" + addressNum
+}
\ No newline at end of file
Modified: trunk/server/memcached/pom.xml
===================================================================
--- trunk/server/memcached/pom.xml 2010-03-22 20:12:52 UTC (rev 1612)
+++ trunk/server/memcached/pom.xml 2010-03-23 17:59:55 UTC (rev 1613)
@@ -33,6 +33,42 @@
</dependency>
</dependencies>
+ <build>
+ <finalName>infinispan</finalName>
+ <sourceDirectory>src/main/scala</sourceDirectory>
+ <testSourceDirectory>src/test/scala</testSourceDirectory>
+
+ <plugins>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>test-compile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ <execution>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ </build>
+
<repositories>
<repository>
<id>spy</id>
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,407 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.manager.{CacheManager}
+import org.infinispan.server.core.Operation._
+import org.infinispan.server.memcached.MemcachedOperation._
+import org.infinispan.context.Flag
+import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
+import org.infinispan.{Version, CacheException, Cache}
+import java.io.{IOException, EOFException, StreamCorruptedException}
+import java.nio.channels.ClosedChannelException
+import java.util.concurrent.atomic.AtomicLong
+import org.infinispan.stats.Stats
+import org.infinispan.server.core.transport.{Channel, ChannelBuffers, ChannelHandlerContext, ChannelBuffer}
+import org.infinispan.server.core._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class MemcachedDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
+ import MemcachedDecoder._
+
+ type SuitableParameters = MemcachedParameters
+
+ private var scheduler: ScheduledExecutorService = _
+ private var cache: Cache[String, MemcachedValue] = _
+ private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
+ private final val incrMisses = new AtomicLong(0)
+ private final val incrHits = new AtomicLong(0)
+ private final val decrMisses = new AtomicLong(0)
+ private final val decrHits = new AtomicLong(0)
+ private final val replaceIfUnmodifiedMisses = new AtomicLong(0)
+ private final val replaceIfUnmodifiedHits = new AtomicLong(0)
+ private final val replaceIfUnmodifiedBadval = new AtomicLong(0)
+
+ override def readHeader(buffer: ChannelBuffer): RequestHeader = {
+ val streamOp = readElement(buffer)
+ val op = OperationResolver.resolve(streamOp)
+ if (op == None) {
+ val line = readLine(buffer) // Read rest of line to clear the operation
+ throw new UnknownOperationException("Unknown operation: " + streamOp);
+ }
+ new RequestHeader(op.get)
+ }
+
+ override def readKey(buffer: ChannelBuffer): String = {
+ readElement(buffer)
+ }
+
+ override def readKeys(buffer: ChannelBuffer): Array[String] = {
+ val line = readLine(buffer)
+ line.trim.split(" +")
+ }
+
+ override def readParameters(op: Enumeration#Value, buffer: ChannelBuffer): MemcachedParameters = {
+ val line = readLine(buffer)
+ if (!line.isEmpty) {
+ trace("Operation parameters: {0}", line)
+ val args = line.trim.split(" +")
+ var index = 0
+ op match {
+ case DeleteRequest => {
+ val delayedDeleteTime = parseDelayedDeleteTime(index, args)
+ val noReply = if (delayedDeleteTime == -1) parseNoReply(index, args) else false
+ new MemcachedParameters(null, -1, -1, -1, noReply, 0, "", 0)
+ }
+ case IncrementRequest | DecrementRequest => {
+ val delta = args(index)
+ index += 1
+ new MemcachedParameters(null, -1, -1, -1, parseNoReply(index, args), 0, delta, 0)
+ }
+ case FlushAllRequest => {
+ val flushDelay = args(index).toInt
+ index += 1
+ new MemcachedParameters(null, -1, -1, -1, parseNoReply(index, args), 0, "", flushDelay)
+ }
+ case _ => {
+ val flags = getFlags(args(index))
+ index += 1
+ val lifespan = {
+ val streamLifespan = getLifespan(args(index))
+ if (streamLifespan <= 0) -1 else streamLifespan
+ }
+ index += 1
+ val length = getLength(args(index))
+ val streamVersion = op match {
+ case ReplaceIfUnmodifiedRequest => {
+ index += 1
+ getVersion(args(index))
+ }
+ case _ => -1
+ }
+ index += 1
+ val noReply = parseNoReply(index, args)
+ val data = new Array[Byte](length)
+ buffer.readBytes(data, 0, data.length)
+ readLine(buffer) // read the rest of line to clear CRLF after value Byte[]
+ new MemcachedParameters(data, lifespan, -1, streamVersion, noReply, flags, "", 0)
+ }
+ }
+ } else {
+ // For example when delete <key> is sent without any further parameters
+ new MemcachedParameters(null, -1, -1, -1, false, 0, "", 0)
+ }
+ }
+
+ override def createValue(params: MemcachedParameters, nextVersion: Long): MemcachedValue = {
+ new MemcachedValue(params.data, nextVersion, params.flags)
+ }
+
+ private def getFlags(flags: String): Int = {
+ if (flags == null) throw new EOFException("No flags passed")
+ flags.toInt
+ }
+
+ private def getLifespan(lifespan: String): Int = {
+ if (lifespan == null) throw new EOFException("No expiry passed")
+ lifespan.toInt
+ }
+
+ private def getLength(length: String): Int = {
+ if (length == null) throw new EOFException("No bytes passed")
+ length.toInt
+ }
+
+ private def getVersion(version: String): Long = {
+ if (version == null) throw new EOFException("No cas passed")
+ version.toLong
+ }
+
+ private def parseNoReply(expectedIndex: Int, args: Array[String]): Boolean = {
+ if (args.length > expectedIndex) {
+ if ("noreply" == args(expectedIndex))
+ true
+ else
+ throw new StreamCorruptedException("Unable to parse noreply optional argument")
+ }
+ else false
+ }
+
+ private def parseDelayedDeleteTime(expectedIndex: Int, args: Array[String]): Int = {
+ if (args.length > expectedIndex) {
+ try {
+ args(expectedIndex).toInt
+ }
+ catch {
+ case e: NumberFormatException => return -1 // Either unformatted number, or noreply found
+ }
+ }
+ else 0
+ }
+
+ override def getCache(header: RequestHeader): Cache[String, MemcachedValue] = cache
+
+ protected def createCache: Cache[String, MemcachedValue] = cacheManager.getCache[String, MemcachedValue]
+
+ override def handleCustomRequest(header: RequestHeader, ctx: ChannelHandlerContext,
+ buffer: ChannelBuffer, cache: Cache[String, MemcachedValue]) {
+ header.op match {
+ case AppendRequest | PrependRequest => {
+ val k = readKey(buffer)
+ val params = readParameters(header.op, buffer)
+ val prev = cache.get(k)
+ if (prev != null) {
+ val concatenated = header.op match {
+ case AppendRequest => concat(prev.v, params.data);
+ case PrependRequest => concat(params.data, prev.v);
+ }
+ val next = createValue(concatenated, generateVersion(cache), params.flags)
+ val replaced = cache.replace(k, prev, next);
+ if (replaced)
+ sendResponse(header, ctx, None, None, Some(params), Some(prev))
+ else // If there's a concurrent modification on this key, treat it as we couldn't replace it
+ sendResponse(header, ctx, None, None, Some(params), Some(null)) //
+ } else {
+ sendResponse(header, ctx, None, None, Some(params), Some(null))
+ }
+ }
+ case IncrementRequest | DecrementRequest => {
+ val k = readKey(buffer)
+ val params = readParameters(header.op, buffer)
+ val prev = cache.get(k)
+ if (prev != null) {
+ val prevCounter = new String(prev.v)
+ val newCounter =
+ header.op match {
+ case IncrementRequest => prevCounter.toLong + params.delta.toLong
+ case DecrementRequest => {
+ val candidateCounter = prevCounter.toLong - params.delta.toLong
+ if (candidateCounter < 0) 0 else candidateCounter
+ }
+ }
+ val next = createValue(newCounter.toString.getBytes, generateVersion(cache), params.flags)
+ var replaced = cache.replace(k, prev, next)
+ if (replaced)
+ sendResponse(header, ctx, None, Some(next), Some(params), Some(prev))
+ else // If there's a concurrent modification on this key, the spec does not say what to do, so treat it as exceptional
+ throw new CacheException("Value modified since we retrieved from the cache, old value was " + prevCounter)
+ } else {
+ sendResponse(header, ctx, None, None, Some(params), Some(null))
+ }
+ }
+ case FlushAllRequest => {
+ val params = readParameters(header.op, buffer)
+ val flushFunction = (cache: Cache[String, MemcachedValue]) => cache.getAdvancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STORE).clear
+ if (params.flushDelay == 0)
+ // cache.getAdvancedCache.withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_STORE).clear
+ flushFunction(cache)
+ else
+ scheduler.schedule(new DelayedFlushAll(cache, flushFunction), params.flushDelay, TimeUnit.SECONDS)
+ sendResponse(header, ctx, None, None, Some(params), None)
+ }
+ case VersionRequest => sendResponse(header, ctx, None, None, None, None)
+// case StatsRequest =>
+ }
+ }
+
+ // todo: potentially move this up when implementing hotrod since only what's written might change, the rest of logic is likely to be the same
+ override def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext,
+ k: Option[String], v: Option[MemcachedValue],
+ params: Option[MemcachedParameters], prev: Option[MemcachedValue]) {
+ val buffers = ctx.getChannelBuffers
+ val ch = ctx.getChannel
+ if (params == None || !params.get.noReply) {
+ header.op match {
+ case PutRequest => ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+ case GetRequest | GetWithVersionRequest => {
+ if (v.get != null)
+ ch.write(buildGetResponse(header.op, ctx, k.get, v.get))
+ ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+ }
+ case PutIfAbsentRequest => {
+ if (prev.get == null)
+ ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+ else
+ ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
+ }
+ case ReplaceRequest | AppendRequest | PrependRequest => {
+ if (prev.get == null)
+ ch.write(buffers.wrappedBuffer("NOT_STORED\r\n".getBytes))
+ else
+ ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+ }
+ case ReplaceIfUnmodifiedRequest => {
+ if (v != None && prev != None) {
+ if (isStatsEnabled) replaceIfUnmodifiedHits.incrementAndGet
+ ch.write(buffers.wrappedBuffer("STORED\r\n".getBytes))
+ } else if (v == None && prev != None) {
+ if (isStatsEnabled) replaceIfUnmodifiedBadval.incrementAndGet
+ ch.write(buffers.wrappedBuffer("EXISTS\r\n".getBytes))
+ } else {
+ if (isStatsEnabled) replaceIfUnmodifiedMisses.incrementAndGet
+ ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+ }
+ }
+ case DeleteRequest => {
+ if (prev.get == null)
+ ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+ else
+ ch.write(buffers.wrappedBuffer("DELETED\r\n".getBytes))
+ }
+ case IncrementRequest | DecrementRequest => {
+ if (prev.get == null) {
+ if (isStatsEnabled) if (header.op == IncrementRequest) incrMisses.incrementAndGet() else decrMisses.incrementAndGet
+ ch.write(buffers.wrappedBuffer("NOT_FOUND\r\n".getBytes))
+ } else {
+ if (isStatsEnabled) if (header.op == IncrementRequest) incrHits.incrementAndGet() else decrHits.incrementAndGet
+ ch.write(buffers.wrappedBuffer((new String(v.get.v) + CRLF).getBytes))
+ }
+ }
+ case FlushAllRequest => {
+ ch.write(buffers.wrappedBuffer("OK\r\n".getBytes))
+ }
+ case VersionRequest => {
+ val sb = new StringBuilder
+ sb.append("VERSION ").append(Version.version).append(CRLF)
+ ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+ }
+ }
+ }
+ }
+
+ override def sendResponse(header: RequestHeader, ctx: ChannelHandlerContext, pairs: Map[String, MemcachedValue]) {
+ val buffers = ctx.getChannelBuffers
+ val ch = ctx.getChannel
+ header.op match {
+ case GetRequest | GetWithVersionRequest => {
+ for ((k, v) <- pairs)
+ ch.write(buildGetResponse(header.op, ctx, k, v))
+ ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+ }
+ }
+ }
+
+ override def sendResponse(ctx: ChannelHandlerContext, t: Throwable) {
+ val ch = ctx.getChannel
+ val buffers = ctx.getChannelBuffers
+ t match {
+ case uoe: UnknownOperationException => ch.write(buffers.wrappedBuffer("ERROR\r\n".getBytes))
+ case cce: ClosedChannelException => // no-op, only log
+ case _ => {
+ val sb = new StringBuilder
+ t match {
+ case ioe: IOException => sb.append("CLIENT_ERROR ")
+ case _ => sb.append("SERVER_ERROR ")
+ }
+ sb.append(t).append(CRLF)
+ ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+ }
+ }
+ }
+
+ def sendResponse(ctx: ChannelHandlerContext, stats: Stats) {
+ var buffers = ctx.getChannelBuffers
+ var ch = ctx.getChannel
+ var sb = new StringBuilder
+
+ writeStat("pid", 0, sb, buffers, ch) // Unsupported
+ writeStat("uptime", stats.getTimeSinceStart, sb, buffers, ch)
+ writeStat("time", TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis), sb, buffers, ch)
+ writeStat("version", cache.getVersion, sb, buffers, ch)
+ writeStat("pointer_size", 0, sb, buffers, ch) // Unsupported
+ writeStat("rusage_user", 0, sb, buffers, ch) // Unsupported
+ writeStat("rusage_system", 0, sb, buffers, ch) // Unsupported
+ writeStat("curr_items", stats.getCurrentNumberOfEntries, sb, buffers, ch)
+ writeStat("total_items", stats.getTotalNumberOfEntries, sb, buffers, ch)
+ writeStat("bytes", 0, sb, buffers, ch) // Unsupported
+ writeStat("curr_connections", 0, sb, buffers, ch) // TODO: Through netty?
+ writeStat("total_connections", 0, sb, buffers, ch) // TODO: Through netty?
+ writeStat("connection_structures", 0, sb, buffers, ch) // Unsupported
+ writeStat("cmd_get", stats.getRetrievals, sb, buffers, ch)
+ writeStat("cmd_set", stats.getStores, sb, buffers, ch)
+ writeStat("get_hits", stats.getHits, sb, buffers, ch)
+ writeStat("get_misses", stats.getMisses, sb, buffers, ch)
+ writeStat("delete_misses", stats.getRemoveMisses, sb, buffers, ch)
+ writeStat("delete_hits", stats.getRemoveHits, sb, buffers, ch)
+ writeStat("incr_misses", incrMisses, sb, buffers, ch)
+ writeStat("incr_hits", incrHits, sb, buffers, ch)
+ writeStat("decr_misses", decrMisses, sb, buffers, ch)
+ writeStat("decr_hits", decrHits, sb, buffers, ch)
+ writeStat("cas_misses", replaceIfUnmodifiedMisses, sb, buffers, ch)
+ writeStat("cas_hits", replaceIfUnmodifiedHits, sb, buffers, ch)
+ writeStat("cas_badval", replaceIfUnmodifiedBadval, sb, buffers, ch)
+ writeStat("auth_cmds", 0, sb, buffers, ch) // Unsupported
+ writeStat("auth_errors", 0, sb, buffers, ch) // Unsupported
+ //TODO: Evictions are measure by evict calls, but not by nodes are that are expired after the entry's lifespan has expired.
+ writeStat("evictions", stats.getEvictions, sb, buffers, ch)
+ writeStat("bytes_read", 0, sb, buffers, ch) // TODO: Through netty?
+ writeStat("bytes_written", 0, sb, buffers, ch) // TODO: Through netty?
+ writeStat("limit_maxbytes", 0, sb, buffers, ch) // Unsupported
+ writeStat("threads", 0, sb, buffers, ch) // TODO: Through netty?
+ writeStat("conn_yields", 0, sb, buffers, ch) // Unsupported
+
+ ch.write(buffers.wrappedBuffer("END\r\n".getBytes))
+ }
+
+ private def writeStat(stat: String, value: Any, sb: StringBuilder, buffers: ChannelBuffers, ch: Channel) {
+ sb.append("STAT").append(' ').append(stat).append(' ').append(value).append(CRLF)
+ ch.write(buffers.wrappedBuffer(sb.toString.getBytes))
+ sb.setLength(0)
+ }
+
+ override def start {
+ scheduler = Executors.newScheduledThreadPool(1)
+ cache = createCache
+ }
+
+ override def stop {
+ scheduler.shutdown
+ }
+
+ private def createValue(data: Array[Byte], nextVersion: Long, flags: Int): MemcachedValue = {
+ new MemcachedValue(data, nextVersion, flags)
+ }
+
+ private def buildGetResponse(op: Enumeration#Value, ctx: ChannelHandlerContext,
+ k: String, v: MemcachedValue): ChannelBuffer = {
+ val header = buildGetResponseHeader(k, v, op)
+ ctx.getChannelBuffers.wrappedBuffer(header.getBytes, v.v, CRLF.getBytes)
+ }
+
+ private def buildGetResponseHeader(k: String, v: MemcachedValue, op: Enumeration#Value): String = {
+ val sb = new StringBuilder
+ sb.append("VALUE ").append(k).append(" ").append(v.flags).append(" ").append(v.v.length).append(" ")
+ if (op == GetWithVersionRequest)
+ sb.append(v.version).append(" ")
+ sb.append(CRLF)
+ sb.toString
+ }
+
+}
+
+object MemcachedDecoder extends Logging
+
+class MemcachedParameters(override val data: Array[Byte], override val lifespan: Int,
+ override val maxIdle: Int, override val version: Long,
+ val noReply: Boolean, val flags: Int, val delta: String,
+ val flushDelay: Int) extends RequestParameters(data, lifespan, maxIdle, version)
+
+private class DelayedFlushAll(cache: Cache[String, MemcachedValue],
+ flushFunction: Cache[String, MemcachedValue] => Unit) extends Runnable {
+ override def run() = flushFunction(cache)
+}
+
+
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedOperation.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,16 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Operation._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+object MemcachedOperation extends Enumeration(10) {
+ type MemcachedOperation = Value
+ val AppendRequest, PrependRequest = Value
+ val IncrementRequest, DecrementRequest = Value
+ val FlushAllRequest, VersionRequest = Value
+}
\ No newline at end of file
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,19 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.AbstractProtocolServer
+import org.infinispan.server.core.transport.{Decoder, Encoder}
+import org.infinispan.manager.CacheManager
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+class MemcachedServer extends AbstractProtocolServer {
+
+ protected override def getEncoder: Encoder = null
+
+ protected override def getDecoder(cacheManager: CacheManager): Decoder = new MemcachedDecoder(cacheManager)
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedValue.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,40 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Value
+import org.infinispan.util.Util
+import java.io.{ObjectOutput, ObjectInput}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// TODO: Make it a hardcoded Externalizer
+class MemcachedValue(override val v: Array[Byte], override val version: Long, val flags: Int)
+ extends Value(v, version) {
+
+ override def toString = {
+ new StringBuilder().append("MemcachedValue").append("{")
+ .append("v=").append(Util.printArray(v, false))
+ .append(", version=").append(version)
+ .append(", flags=").append(flags)
+ .append("}").toString
+ }
+
+// override def readExternal(in: ObjectInput) {
+//// v = new Array[Byte](in.read())
+//// in.read(v)
+//// version = in.readLong
+// super.readExternal(in)
+// flags = in.readInt
+// }
+//
+// override def writeExternal(out: ObjectOutput) {
+// super.writeExternal(out)
+//// out.write(v.length)
+//// out.write(v)
+//// out.writeLong(version)
+// out.writeInt(flags)
+// }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/OperationResolver.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,38 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.Operation._
+import org.infinispan.server.memcached.MemcachedOperation._
+import org.infinispan.server.core.Logging
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// todo: maybe try to abstract this into something that can be shared betwen hr and memcached
+object OperationResolver extends Logging {
+ // TODO: Rather than holding a map, check if the String could be passed as part of the Enumeration and whether this could be retrieved in a O(1) op
+ private val operations = Map[String, Enumeration#Value](
+ "set" -> PutRequest,
+ "add" -> PutIfAbsentRequest,
+ "replace" -> ReplaceRequest,
+ "cas" -> ReplaceIfUnmodifiedRequest,
+ "append" -> AppendRequest,
+ "prepend" -> PrependRequest,
+ "get" -> GetRequest,
+ "gets" -> GetWithVersionRequest,
+ "delete" -> DeleteRequest,
+ "incr" -> IncrementRequest,
+ "decr" -> DecrementRequest,
+ "flush_all" -> FlushAllRequest,
+ "version" -> VersionRequest,
+ "stats" -> StatsRequest
+ )
+
+ def resolve(commandName: String): Option[Enumeration#Value] = {
+ trace("Operation: {0}", commandName)
+ val op = operations.get(commandName)
+ op
+ }
+}
+
Added: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala (rev 0)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/TextProtocolUtil.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,77 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.server.core.transport.ChannelBuffer
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+// todo: refactor name once old code has been removed?
+trait TextProtocolUtil {
+
+ final val CRLF = "\r\n"
+ final val CR = 13
+ final val LF = 10
+
+ // TODO: maybe convert to recursive
+ def readElement(buffer: ChannelBuffer): String = {
+ if (buffer.readableBytes > 0)
+ readElement(buffer, new StringBuilder())
+ else
+ ""
+ }
+
+ private def readElement(buffer: ChannelBuffer, sb: StringBuilder): String = {
+ var next = buffer.readByte
+ if (next == 32) { // Space
+ sb.toString.trim
+ }
+ else if (next == 13) { // CR
+ next = buffer.readByte
+ if (next == 10) { // LF
+ sb.toString.trim
+ } else {
+ sb.append(next.asInstanceOf[Char])
+ readElement(buffer, sb)
+ }
+ }
+ else {
+ sb.append(next.asInstanceOf[Char])
+ readElement(buffer, sb)
+ }
+ }
+
+ def readLine(buffer: ChannelBuffer): String = {
+ if (buffer.readableBytes > 0)
+ readLine(buffer, new StringBuilder())
+ else
+ ""
+ }
+
+ private def readLine(buffer: ChannelBuffer, sb: StringBuilder): String = {
+ var next = buffer.readByte
+ if (next == 13) { // CR
+ next = buffer.readByte
+ if (next == 10) { // LF
+ sb.toString.trim
+ } else {
+ sb.append(next.asInstanceOf[Char])
+ readLine(buffer, sb)
+ }
+ } else if (next == 10) { //LF
+ sb.toString.trim
+ } else {
+ sb.append(next.asInstanceOf[Char])
+ readLine(buffer, sb)
+ }
+ }
+
+ def concat(a: Array[Byte], b: Array[Byte]): Array[Byte] = {
+ val data = new Array[Byte](a.length + b.length)
+ Array.copy(a, 0, data, 0, a.length)
+ Array.copy(b, 0, data, a.length, b.length)
+ return data
+ }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,334 @@
+package org.infinispan.server.memcached
+
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import org.infinispan.manager.CacheManager
+import test.MemcachedTestingUtil
+import java.lang.reflect.Method
+import java.util.concurrent.TimeUnit
+import org.testng.Assert._
+import org.testng.annotations.{AfterClass, Test}
+import net.spy.memcached.{CASResponse, MemcachedClient}
+import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+import java.net.SocketAddress
+import org.infinispan.Version
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.FunctionalTest")
+class MemcachedFunctionalTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+ private var client: MemcachedClient = _
+ private var server: MemcachedServer = _
+ private val timeout: Int = 60
+
+ override def createCacheManager: CacheManager = {
+ cacheManager = TestCacheManagerFactory.createLocalCacheManager
+ server = startMemcachedTextServer(cacheManager)
+ client = createMemcachedClient(60000, server.getPort)
+ return cacheManager
+ }
+
+ @AfterClass(alwaysRun = true)
+ override def destroyAfterClass {
+ super.destroyAfterClass
+ log.debug("Test finished, close memcached server", null)
+ client.shutdown
+ server.stop
+ }
+
+ def testSetBasic(m: Method) {
+ val f = client.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ }
+
+ def testSetWithExpirySeconds(m: Method) {
+ val f = client.set(k(m), 1, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ }
+
+ def testSetWithExpiryUnixTime(m: Method) {
+ val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+ val f = client.set(k(m), future, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ }
+
+ def testGetMultipleKeys(m: Method) {
+ val f1 = client.set(k(m, "k1-"), 0, v(m, "v1-"))
+ val f2 = client.set(k(m, "k2-"), 0, v(m, "v2-"))
+ val f3 = client.set(k(m, "k3-"), 0, v(m, "v3-"))
+ assertTrue(f1.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertTrue(f2.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertTrue(f3.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val keys = List(k(m, "k1-"), k(m, "k2-"), k(m, "k3-"))
+ val ret = client.getBulk(keys: _*)
+ assertEquals(ret.get(k(m, "k1-")), v(m, "v1-"))
+ assertEquals(ret.get(k(m, "k2-")), v(m, "v2-"))
+ assertEquals(ret.get(k(m, "k3-")), v(m, "v3-"))
+ }
+
+ def testAddBasic(m: Method) {
+ addAndGet(m)
+ }
+
+ def testAddWithExpirySeconds(m: Method) {
+ var f = client.add(k(m), 1, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ f = client.add(k(m), 0, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m, "v1-"))
+ }
+
+ def testAddWithExpiryUnixTime(m: Method) {
+ val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+ var f = client.add(k(m), future, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ f = client.add(k(m), 0, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m, "v1-"))
+ }
+
+ def testNotAddIfPresent(m: Method) {
+ addAndGet(m)
+ val f = client.add(k(m), 0, v(m, "v1-"))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ }
+
+ def testReplaceBasic(m: Method) {
+ addAndGet(m)
+ val f = client.replace(k(m), 0, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m, "v1-"))
+ }
+
+ def testNotReplaceIfNotPresent(m: Method) {
+ val f = client.replace(k(m), 0, v(m))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertNull(client.get(k(m)))
+ }
+
+ def testReplaceWithExpirySeconds(m: Method) {
+ addAndGet(m)
+ val f = client.replace(k(m), 1, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m, "v1-"))
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ }
+
+ def testReplaceWithExpiryUnixTime(m: Method) {
+ addAndGet(m)
+ val future: Int = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+ val f = client.replace(k(m), future, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m, "v1-"))
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m)))
+ }
+
+ def testAppendBasic(m: Method) {
+ addAndGet(m)
+ val f = client.append(0, k(m), v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val expected = v(m) + v(m, "v1-")
+ assertEquals(client.get(k(m)), expected)
+ }
+
+ def testAppendNotFound(m: Method) {
+ addAndGet(m)
+ val f = client.append(0, k(m, "k2-"), v(m, "v1-"))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ assertNull(client.get(k(m, "k2-")))
+ }
+
+ def testPrependBasic(m: Method) {
+ addAndGet(m)
+ val f = client.prepend(0, k(m), v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val expected = v(m, "v1-") + v(m)
+ assertEquals(client.get(k(m)), expected)
+ }
+
+ def testPrependNotFound(m: Method) {
+ addAndGet(m)
+ val f = client.prepend(0, k(m, "k2-"), v(m, "v1-"))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ assertNull(client.get(k(m, "k2-")))
+ }
+
+ def testGetsBasic(m: Method) {
+ addAndGet(m)
+ var value = client.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ }
+
+ def testCasBasic(m: Method) {
+ addAndGet(m)
+ var value = client.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ var resp = client.cas(k(m), value.getCas, v(m, "v1-"))
+ assertEquals(resp, CASResponse.OK)
+ }
+
+ def testCasNotFound(m: Method) {
+ addAndGet(m)
+ var value = client.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ var resp = client.cas(k(m, "k1-"), value.getCas, v(m, "v1-"))
+ assertEquals(resp, CASResponse.NOT_FOUND)
+ }
+
+ def testCasExists(m: Method) {
+ addAndGet(m)
+ var value = client.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ val old = value.getCas
+ var resp = client.cas(k(m), value.getCas, v(m, "v1-"))
+ value = client.gets(k(m))
+ assertEquals(value.getValue(), v(m, "v1-"))
+ assertTrue(value.getCas() != 0)
+ assertTrue(value.getCas() != old)
+ resp = client.cas(k(m), old, v(m, "v2-"))
+ assertEquals(resp, CASResponse.EXISTS)
+ resp = client.cas(k(m), value.getCas, v(m, "v2-"))
+ assertEquals(resp, CASResponse.OK)
+ }
+
+ def testDeleteBasic(m: Method) {
+ addAndGet(m)
+ val f = client.delete(k(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertNull(client.get(k(m)))
+ }
+
+ def testDeleteDoesNotExist(m: Method) {
+ val f = client.delete(k(m))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ }
+
+ def testIncrementBasic(m: Method) {
+ val f = client.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val result = client.incr(k(m), 1)
+ assertEquals(result, 2)
+ }
+
+ def testIncrementTriple(m: Method) {
+ val f = client.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.incr(k(m), 1), 2)
+ assertEquals(client.incr(k(m), 2), 4)
+ assertEquals(client.incr(k(m), 4), 8)
+ }
+
+ def testIncrementNotExist(m: Method) {
+ assertEquals(client.incr(k(m), 1), -1)
+ }
+
+ def testIncrementIntegerMax(m: Method) {
+ val f = client.set(k(m), 0, "0")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.incr(k(m), Integer.MAX_VALUE), Integer.MAX_VALUE)
+ }
+
+ def testIncrementBeyondIntegerMax(m: Method) {
+ val f = client.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val newValue = client.incr(k(m), Integer.MAX_VALUE)
+ assertEquals(newValue, Int.MaxValue.asInstanceOf[Long] + 1)
+ }
+
+ def testDecrementBasic(m: Method) {
+ var f = client.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.decr(k(m), 1), 0)
+ }
+
+ def testDecrementTriple(m: Method) {
+ var f = client.set(k(m), 0, "8")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.decr(k(m), 1), 7)
+ assertEquals(client.decr(k(m), 2), 5)
+ assertEquals(client.decr(k(m), 4), 1)
+ }
+
+ def testDecrementNotExist(m: Method): Unit = {
+ assertEquals(client.decr(k(m), 1), -1)
+ }
+
+ def testDecrementBelowZero(m: Method) {
+ var f = client.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ var newValue = client.decr(k(m), 2)
+ assertEquals(newValue, 0)
+ }
+
+ def testFlushAll(m: Method) {
+ for (i <- 1 to 5) {
+ val key = k(m, "k" + i + "-");
+ val value = v(m, "v" + i + "-");
+ val f = client.set(key, 0, value);
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(key), value)
+ }
+
+ val f = client.flush();
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+
+ for (i <- 1 to 5) {
+ val key = k(m, "k" + i + "-");
+ assertNull(client.get(key))
+ }
+ }
+
+ def testFlushAllDelayed(m: Method) {
+ for (i <- 1 to 5) {
+ val key = k(m, "k" + i + "-");
+ val value = v(m, "v" + i + "-");
+ val f = client.set(key, 0, value);
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(key), value)
+ }
+
+ val f = client.flush(2);
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+
+ TestingUtil.sleepThread(2200);
+
+ for (i <- 1 to 5) {
+ val key = k(m, "k" + i + "-");
+ assertNull(client.get(key))
+ }
+ }
+
+ def testVersion {
+ val versions = client.getVersions
+ assertEquals(versions.size(), 1)
+ val version = versions.values.iterator.next
+ assertEquals(version, Version.version)
+ }
+
+ private def addAndGet(m: Method) {
+ val f = client.add(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,142 @@
+package org.infinispan.server.memcached
+
+import org.testng.Assert._
+import org.infinispan.test.MultipleCacheManagersTest
+import test.MemcachedTestingUtil
+import org.infinispan.config.Configuration
+import org.testng.annotations.{AfterClass, Test}
+import java.util.concurrent.TimeUnit
+import java.lang.reflect.Method
+import net.spy.memcached.{CASResponse, MemcachedClient}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedClusterTest")
+class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
+ private val cacheName = "MemcachedReplSync"
+ private[this] var servers: List[MemcachedServer] = List()
+ private[this] var clients: List[MemcachedClient] = List()
+ private val timeout: Int = 60
+
+ @Test(enabled = false) // Disable explicitly to avoid TestNG thinking this is a test!!
+ override def createCacheManagers {
+ var replSync = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC)
+ createClusteredCaches(2, cacheName, replSync)
+ servers = startMemcachedTextServer(cacheManagers.get(0), cacheName) :: servers
+ servers = startMemcachedTextServer(cacheManagers.get(1), servers.head.getPort + 50, cacheName) :: servers
+ servers.foreach(s => clients = createMemcachedClient(60000, s.getPort) :: clients)
+ }
+
+ @AfterClass(alwaysRun = true)
+ override def destroy {
+ super.destroy
+ log.debug("Test finished, close Hot Rod server", null)
+ clients.foreach(_.shutdown)
+ servers.foreach(_.stop)
+ }
+
+ def testReplicatedSet(m: Method) {
+ val f = clients.head.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.get(k(m)), v(m))
+ }
+
+ def testReplicatedGetMultipleKeys(m: Method) {
+ val f1 = clients.head.set(k(m, "k1-"), 0, v(m, "v1-"))
+ val f2 = clients.head.set(k(m, "k2-"), 0, v(m, "v2-"))
+ val f3 = clients.head.set(k(m, "k3-"), 0, v(m, "v3-"))
+ assertTrue(f1.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertTrue(f2.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertTrue(f3.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val keys = List(k(m, "k1-"), k(m, "k2-"), k(m, "k3-"))
+ val ret = clients.tail.head.getBulk(keys: _*)
+ assertEquals(ret.get(k(m, "k1-")), v(m, "v1-"))
+ assertEquals(ret.get(k(m, "k2-")), v(m, "v2-"))
+ assertEquals(ret.get(k(m, "k3-")), v(m, "v3-"))
+ }
+
+ def testReplicatedAdd(m: Method) {
+ val f = clients.head.add(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.get(k(m)), v(m))
+ }
+
+ def testReplicatedReplace(m: Method) {
+ var f = clients.head.add(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.get(k(m)), v(m))
+ f = clients.tail.head.replace(k(m), 0, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.head.get(k(m)), v(m, "v1-"))
+ }
+
+ def testReplicatedAppend(m: Method) {
+ var f = clients.head.add(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.get(k(m)), v(m))
+ f = clients.tail.head.append(0, k(m), v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val expected = v(m).toString + v(m, "v1-").toString
+ assertEquals(clients.head.get(k(m)), expected)
+ }
+
+ def testReplicatedPrepend(m: Method) {
+ var f = clients.head.add(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.get(k(m)), v(m))
+ f = clients.tail.head.prepend(0, k(m), v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ val expected = v(m, "v1-").toString + v(m).toString
+ assertEquals(clients.head.get(k(m)), expected)
+ }
+
+ def testReplicatedGets(m: Method) {
+ var f = clients.head.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ var value = clients.tail.head.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ }
+
+ def testReplicatedCasExists(m: Method) {
+ var f = clients.head.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ var value = clients.tail.head.gets(k(m))
+ assertEquals(value.getValue(), v(m))
+ assertTrue(value.getCas() != 0)
+ val old = value.getCas
+ var resp = clients.tail.head.cas(k(m), value.getCas, v(m, "v1-"))
+ value = clients.head.gets(k(m))
+ assertEquals(value.getValue(), v(m, "v1-"))
+ assertTrue(value.getCas() != 0)
+ assertTrue(value.getCas() != old)
+ resp = clients.head.cas(k(m), old, v(m, "v2-"))
+ assertEquals(resp, CASResponse.EXISTS)
+ resp = clients.tail.head.cas(k(m), value.getCas, v(m, "v2-"))
+ assertEquals(resp, CASResponse.OK)
+ }
+
+ def testReplicatedDelete(m: Method) {
+ var f = clients.head.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ f = clients.tail.head.delete(k(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ }
+
+ def testReplicatedIncrement(m: Method) {
+ var f = clients.head.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.incr(k(m), 1), 2)
+ }
+
+ def testReplicatedDecrement(m: Method): Unit = {
+ val f = clients.head.set(k(m), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(clients.tail.head.decr(k(m), 1), 0)
+ }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,196 @@
+package org.infinispan.server.memcached
+
+import test.MemcachedTestingUtil
+import org.testng.annotations.{AfterClass, Test}
+import org.infinispan.manager.CacheManager
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import net.spy.memcached.MemcachedClient
+import java.lang.reflect.Method
+import org.testng.Assert._
+import java.util.concurrent.TimeUnit
+import org.infinispan.Version
+import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.StatsTest")
+class MemcachedStatsTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+ private var jmxDomain = classOf[MemcachedStatsTest].getSimpleName
+ private var client: MemcachedClient = _
+ private var server: MemcachedServer = _
+ private var timeout: Int = 60
+
+ override def createCacheManager: CacheManager = {
+ cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
+ server = startMemcachedTextServer(cacheManager)
+ client = createMemcachedClient(60000, server.getPort)
+ return cacheManager
+ }
+
+ @AfterClass(alwaysRun = true)
+ override def destroyAfterClass {
+ super.destroyAfterClass
+ log.debug("Test finished, close memcached server", null)
+ client.shutdown
+ server.stop
+ }
+
+ def testUnsupportedStats(m: Method) {
+ val stats = getStats
+ assertEquals(stats.get("pid"), "0")
+ assertEquals(stats.get("pointer_size"), "0")
+ assertEquals(stats.get("rusage_user"), "0")
+ assertEquals(stats.get("rusage_system"), "0")
+ assertEquals(stats.get("bytes"), "0")
+ assertEquals(stats.get("connection_structures"), "0")
+ assertEquals(stats.get("auth_cmds"), "0")
+ assertEquals(stats.get("auth_errors"), "0")
+ assertEquals(stats.get("limit_maxbytes"), "0")
+ assertEquals(stats.get("conn_yields"), "0")
+ }
+
+ def testUncomparableStats(m: Method) {
+ TestingUtil.sleepThread(TimeUnit.SECONDS.toMillis(1))
+ val stats = getStats
+ assertNotSame(stats.get("uptime"), "0")
+ assertNotSame(stats.get("time"), "0")
+ assertNotSame(stats.get("uptime"), stats.get("time"))
+ }
+
+ def testStaticStats(m: Method) {
+ val stats = getStats
+ assertEquals(stats.get("version"), Version.version)
+ }
+
+ def testTodoStats: Unit = {
+ val stats = getStats
+ assertEquals(stats.get("curr_connections"), "0")
+ assertEquals(stats.get("total_connections"), "0")
+ assertEquals(stats.get("bytes_read"), "0")
+ assertEquals(stats.get("bytes_written"), "0")
+ assertEquals(stats.get("threads"), "0")
+ }
+
+
+ def testStats(m: Method): Unit = {
+ var stats = getStats
+ assertEquals(stats.get("cmd_set"), "0")
+ assertEquals(stats.get("cmd_get"), "0")
+ assertEquals(stats.get("get_hits"), "0")
+ assertEquals(stats.get("get_misses"), "0")
+ assertEquals(stats.get("delete_hits"), "0")
+ assertEquals(stats.get("delete_misses"), "0")
+ assertEquals(stats.get("curr_items"), "0")
+ assertEquals(stats.get("total_items"), "0")
+ assertEquals(stats.get("incr_misses"), "0")
+ assertEquals(stats.get("incr_hits"), "0")
+ assertEquals(stats.get("decr_misses"), "0")
+ assertEquals(stats.get("decr_hits"), "0")
+ assertEquals(stats.get("cas_misses"), "0")
+ assertEquals(stats.get("cas_hits"), "0")
+ assertEquals(stats.get("cas_badval"), "0")
+
+ var f = client.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ f = client.set(k(m, "k1-"), 0, v(m, "v1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m, "k1-")), v(m, "v1-"))
+ stats = getStats
+ assertEquals(stats.get("cmd_set"), "2")
+ assertEquals(stats.get("cmd_get"), "2")
+ assertEquals(stats.get("get_hits"), "2")
+ assertEquals(stats.get("get_misses"), "0")
+ assertEquals(stats.get("delete_hits"), "0")
+ assertEquals(stats.get("delete_misses"), "0")
+ assertEquals(stats.get("curr_items"), "2")
+ assertEquals(stats.get("total_items"), "2")
+
+ f = client.delete(k(m, "k1-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ stats = getStats
+ assertEquals(stats.get("curr_items"), "1")
+ assertEquals(stats.get("total_items"), "2")
+ assertEquals(stats.get("delete_hits"), "1")
+ assertEquals(stats.get("delete_misses"), "0")
+
+ assertNull(client.get(k(m, "k99-")))
+ stats = getStats
+ assertEquals(stats.get("get_hits"), "2")
+ assertEquals(stats.get("get_misses"), "1")
+
+ f = client.delete(k(m, "k99-"))
+ assertFalse(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ stats = getStats
+ assertEquals(stats.get("delete_hits"), "1")
+ assertEquals(stats.get("delete_misses"), "1")
+
+ val future = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis + 1000).asInstanceOf[Int]
+ f = client.set(k(m, "k3-"), future, v(m, "v3-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ TestingUtil.sleepThread(1100)
+ assertNull(client.get(k(m, "k3-")))
+ stats = getStats
+ assertEquals(stats.get("curr_items"), "1")
+ assertEquals(stats.get("total_items"), "3")
+
+ client.incr(k(m, "k4-"), 1)
+ stats = getStats
+ assertEquals(stats.get("incr_misses"), "1")
+ assertEquals(stats.get("incr_hits"), "0")
+
+ f = client.set(k(m, "k4-"), 0, "1")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ client.incr(k(m, "k4-"), 1)
+ client.incr(k(m, "k4-"), 2)
+ client.incr(k(m, "k4-"), 4)
+ stats = getStats
+ assertEquals(stats.get("incr_misses"), "1")
+ assertEquals(stats.get("incr_hits"), "3")
+
+ client.decr(k(m, "k5-"), 1)
+ stats = getStats
+ assertEquals(stats.get("decr_misses"), "1")
+ assertEquals(stats.get("decr_hits"), "0")
+
+ f = client.set(k(m, "k5-"), 0, "8")
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ client.decr(k(m, "k5-"), 1)
+ client.decr(k(m, "k5-"), 2)
+ client.decr(k(m, "k5-"), 4)
+ stats = getStats
+ assertEquals(stats.get("decr_misses"), "1")
+ assertEquals(stats.get("decr_hits"), "3")
+
+ client.cas(k(m, "k6-"), 1234, v(m, "v6-"))
+ stats = getStats
+ assertEquals(stats.get("cas_misses"), "1")
+ assertEquals(stats.get("cas_hits"), "0")
+ assertEquals(stats.get("cas_badval"), "0")
+
+ f = client.set(k(m, "k6-"), 0, v(m, "v6-"))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ var value = client.gets(k(m, "k6-"))
+ var old: Long = value.getCas
+ client.cas(k(m, "k6-"), value.getCas, v(m, "v66-"))
+ stats = getStats
+ assertEquals(stats.get("cas_misses"), "1")
+ assertEquals(stats.get("cas_hits"), "1")
+ assertEquals(stats.get("cas_badval"), "0")
+ client.cas(k(m, "k6-"), old, v(m, "v66-"))
+ stats = getStats
+ assertEquals(stats.get("cas_misses"), "1")
+ assertEquals(stats.get("cas_hits"), "1")
+ assertEquals(stats.get("cas_badval"), "1")
+ }
+
+ private def getStats() = {
+ val stats = client.getStats()
+ assertEquals(stats.size(), 1)
+ stats.values.iterator.next
+ }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-03-23 17:59:55 UTC (rev 1613)
@@ -0,0 +1,90 @@
+package org.infinispan.server.memcached.test
+
+import java.lang.reflect.Method
+import net.spy.memcached.{DefaultConnectionFactory, MemcachedClient}
+import java.util.Arrays
+import java.net.InetSocketAddress
+import org.infinispan.Cache
+import java.util.concurrent.atomic.AtomicInteger
+import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.transport.Decoder
+import org.infinispan.server.memcached.{MemcachedDecoder, MemcachedValue, MemcachedServer}
+import org.infinispan.server.core.RequestHeader
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since
+ */
+
+trait MemcachedTestingUtil {
+ def host = "127.0.0.1"
+
+// def k(m: Method, prefix: String): Array[Byte] = {
+// val bytes: Array[Byte] = (prefix + m.getName).getBytes
+// trace("String {0} is converted to {1} bytes", prefix + m.getName, bytes)
+// bytes
+// }
+//
+// def v(m: Method, prefix: String): Array[Byte] = k(m, prefix)
+//
+// def k(m: Method): Array[Byte] = k(m, "k-")
+//
+// def v(m: Method): Array[Byte] = v(m, "v-")
+
+ def k(m: Method, prefix: String): String = prefix + m.getName
+
+ def v(m: Method, prefix: String): String = prefix + m.getName
+
+ def k(m: Method): String = k(m, "k-")
+
+ def v(m: Method): String = v(m, "v-")
+
+ def createMemcachedClient(timeout: Long, port: Int): MemcachedClient = {
+ var d: DefaultConnectionFactory = new DefaultConnectionFactory {
+ override def getOperationTimeout: Long = timeout
+ }
+ return new MemcachedClient(d, Arrays.asList(new InetSocketAddress(host, port)))
+ }
+
+ def startMemcachedTextServer(cacheManager: CacheManager): MemcachedServer = {
+ startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue)
+ }
+
+ def startMemcachedTextServer(cacheManager: CacheManager, port: Int): MemcachedServer = {
+ val server = new MemcachedServer
+ server.start(host, port, cacheManager, 0, 0)
+ server
+ }
+
+ def startMemcachedTextServer(cacheManager: CacheManager, cacheName: String): MemcachedServer = {
+ startMemcachedTextServer(cacheManager, UniquePortThreadLocal.get.intValue, cacheName)
+// val server = new MemcachedServer {
+// protected override def getDecoder(cacheManager: CacheManager): Decoder = {
+// new MemcachedDecoder(cacheManager) {
+// override def getCache(header: RequestHeader) = cacheManager.getCache[String, MemcachedValue](cacheName)
+// }
+// }
+// }
+// server.start(host, UniquePortThreadLocal.get.intValue, cacheManager, 0, 0)
+// server
+ }
+
+ def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
+ val server = new MemcachedServer {
+ protected override def getDecoder(cacheManager: CacheManager): Decoder = {
+ new MemcachedDecoder(cacheManager) {
+ override def createCache = cacheManager.getCache[String, MemcachedValue](cacheName)
+ }
+ }
+ }
+ server.start(host, port, cacheManager, 0, 0)
+ server
+ }
+
+}
+
+object UniquePortThreadLocal extends ThreadLocal[Int] {
+ private val uniqueAddr = new AtomicInteger(11211)
+ override def initialValue: Int = uniqueAddr.getAndAdd(100)
+}
\ No newline at end of file
More information about the infinispan-commits
mailing list