[infinispan-commits] Infinispan SVN: r1666 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Apr 7 06:37:04 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-04-07 06:37:02 -0400 (Wed, 07 Apr 2010)
New Revision: 1666
Added:
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
Modified:
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-391] (Hot Rod/Memcached decoders fail to decode properly with multiple worker threads) Fixed by making sure each pipeline gets a new decoder.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -28,7 +28,10 @@
private val versionCounter = new AtomicInteger
override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef = {
- if (buffer.readableBytes < 1) return null
+ if (buffer.readableBytes < 1) {
+ trace("No bytes to decode")
+ return null
+ }
val header = readHeader(buffer)
if (header == null) return null // Something went wrong reading the header, so get more bytes
try {
@@ -153,6 +156,7 @@
errorResponse match {
case a: Array[Byte] => ch.write(wrappedBuffer(a))
case sb: StringBuilder => ch.write(wrappedBuffer(sb.toString.getBytes))
+ case null => // ignore
case _ => ch.write(errorResponse)
}
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,9 +1,9 @@
package org.infinispan.server.core
import java.net.InetSocketAddress
-import transport.netty.{EncoderAdapter, DecoderAdapter, NettyTransport}
+import transport.netty.{EncoderAdapter, NettyTransport}
import transport.{Decoder, Encoder, Transport}
-import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import org.infinispan.manager.CacheManager
/**
* // TODO: Document this
@@ -25,32 +25,25 @@
this.port = port
this.masterThreads = masterThreads
this.workerThreads = workerThreads
+ this.cacheManager = cacheManager
- decoder = getDecoder(cacheManager)
- decoder.start
encoder = getEncoder
// TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
// TODO: ... requests such as when the lenght of data is bigger than the expected data itself.
- val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
val address = new InetSocketAddress(host, port)
// TODO change cache name 'default' to something more meaningful and dependent of protocol
- transport = new NettyTransport(nettyDecoder, nettyEncoder, address, masterThreads, workerThreads, "default")
+ transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, "default")
transport.start
}
override def stop {
if (transport != null)
transport.stop
- if (decoder != null)
- decoder.stop
-// cacheManager.stop
}
+ def getCacheManager = cacheManager
+
def getPort = port
- protected def getEncoder: Encoder
-
- protected def getDecoder(cacheManager: CacheManager): Decoder
-
}
\ No newline at end of file
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,6 +1,7 @@
package org.infinispan.server.core
import org.infinispan.manager.CacheManager
+import transport.{Decoder, Encoder}
/**
* // TODO: Document this
@@ -9,5 +10,7 @@
*/
trait ProtocolServer {
def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int)
- def stop
+ def stop
+ def getEncoder: Encoder
+ def getDecoder: Decoder
}
\ No newline at end of file
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -11,8 +11,4 @@
def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
- def start
-
- def stop
-
}
\ No newline at end of file
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -2,13 +2,14 @@
import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
import org.infinispan.server.core.transport.{VLong, VInt, ChannelBuffer}
+import org.infinispan.server.core.Logging
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class ChannelBufferAdapter(val buffer: NettyChannelBuffer) extends ChannelBuffer {
+class ChannelBufferAdapter(buffer: NettyChannelBuffer) extends ChannelBuffer {
override def readByte: Byte = buffer.readByte
override def readBytes(dst: Array[Byte], dstIndex: Int, length: Int) = buffer.readBytes(dst, dstIndex, length)
@@ -52,4 +53,6 @@
override def getUnderlyingChannelBuffer: AnyRef = buffer
-}
\ No newline at end of file
+}
+
+object ChannelBufferAdapter extends Logging
\ No newline at end of file
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -3,7 +3,8 @@
import org.jboss.netty.handler.codec.replay.ReplayingDecoder
import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
-import org.infinispan.server.core.transport._;
+import org.infinispan.server.core.transport._
+import org.infinispan.server.core.Logging;
/**
* // TODO: Document this
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,23 +1,21 @@
package org.infinispan.server.core.transport.netty
import org.jboss.netty.channel._
+import org.infinispan.server.core.ProtocolServer
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class NettyChannelPipelineFactory(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler)
+class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler)
extends ChannelPipelineFactory {
override def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline
- if (decoder != null) {
- pipeline.addLast("decoder", decoder);
- }
- if (encoder != null) {
- pipeline.addLast("encoder", encoder);
- }
+ pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder))
+ if (encoder != null)
+ pipeline.addLast("encoder", encoder)
return pipeline;
}
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -8,21 +8,23 @@
import org.jboss.netty.bootstrap.ServerBootstrap
import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
import org.infinispan.server.core.transport.Transport
-import org.infinispan.server.core.Logging
import scala.collection.JavaConversions._
+import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.{ProtocolServer, Logging}
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class NettyTransport(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler,
- address: SocketAddress, masterThreads: Int, workerThreads: Int, cacheName: String) extends Transport {
+class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
+ address: SocketAddress, masterThreads: Int, workerThreads: Int,
+ cacheName: String) extends Transport {
import NettyTransport._
val serverChannels = new DefaultChannelGroup(cacheName + "-channels")
val acceptedChannels = new DefaultChannelGroup(cacheName + "-accepted")
- val pipeline = new NettyChannelPipelineFactory(decoder, encoder)
+ val pipeline = new NettyChannelPipelineFactory(server, encoder)
val factory = {
if (workerThreads == 0)
new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
@@ -40,22 +42,18 @@
Executors.newFixedThreadPool(masterThreads, tf)
}
}
- //todo investigate the actual reason why multiple threads do not work
- lazy val workerExecutor = {
+
+ lazy val workerExecutor = {
val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
- Executors.newSingleThreadExecutor(tf)
+ if (workerThreads == 0) {
+ debug("Configured unlimited threads for worker thread pool")
+ Executors.newCachedThreadPool(tf)
+ }
+ else {
+ debug("Configured {0} threads for worker thread pool", workerThreads)
+ Executors.newFixedThreadPool(workerThreads, tf)
+ }
}
-// lazy val workerExecutor = {
-// val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
-// if (workerThreads == 0) {
-// debug("Configured unlimited threads for worker thread pool")
-// Executors.newCachedThreadPool(tf)
-// }
-// else {
-// debug("Configured {0} threads for worker thread pool", workerThreads)
-// Executors.newFixedThreadPool(workerThreads, tf)
-// }
-// }
override def start {
val bootstrap = new ServerBootstrap(factory);
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -10,6 +10,7 @@
import org.infinispan.util.concurrent.TimeoutException
import org.infinispan.server.hotrod.ProtocolFlag._
import org.infinispan.server.hotrod.OperationResponse._
+import java.nio.channels.ClosedChannelException
/**
* // TODO: Document this
@@ -114,12 +115,11 @@
case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
}
}
+ case c: ClosedChannelException => null
+ case t: Throwable => new ErrorResponse(0, ServerError, t.toString)
}
}
- override def start {}
-
- override def stop {}
}
object HotRodDecoder extends Logging {
Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -12,36 +12,8 @@
class HotRodServer extends AbstractProtocolServer {
- protected override def getEncoder: Encoder = new HotRodEncoder
+ override def getEncoder: Encoder = new HotRodEncoder
- protected override def getDecoder(cacheManager: CacheManager): Decoder = new HotRodDecoder(cacheManager)
+ override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
-}
-
-//
-////class HotRodServer(val host: String,
-//// val port: Int,
-//// val manager: CacheManager,
-//// val masterThreads: Int,
-//// val workerThreads: Int) {
-////
-//// import HotRodServer._
-////
-//// private var server: Server = _
-////
-//// def start {
-//// val decoder = new GlobalDecoder
-//// val nettyDecoder = new NettyNoStateDecoder(decoder)
-//// val encoder = new Encoder410
-//// val nettyEncoder = new NettyEncoder(encoder)
-//// val commandHandler = new Handler(new CallerCache(manager))
-//// server = new NettyServer(commandHandler, nettyDecoder, nettyEncoder, new InetSocketAddress(host, port),
-//// masterThreads, workerThreads, "HotRod")
-//// server.start
-//// info("Started Hot Rod bound to {0}:{1}", host, port)
-//// }
-////
-//// def stop {
-//// if (server != null) server.stop
-//// }
-////}
\ No newline at end of file
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -0,0 +1,61 @@
+package org.infinispan.server.hotrod
+
+import java.lang.reflect.Method
+import java.util.concurrent.{Callable, Executors, Future, CyclicBarrier}
+import test.HotRodClient
+import org.testng.annotations.Test
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.HotRodConcurrentTest")
+class HotRodConcurrentTest extends HotRodSingleNodeTest {
+
+ def testConcurrentPutRequests(m: Method) {
+ val numClients = 10
+ val numOpsPerClient = 100
+ val barrier = new CyclicBarrier(numClients + 1)
+ val executorService = Executors.newCachedThreadPool
+ var futures: List[Future[Unit]] = List()
+ var operators: List[Operator] = List()
+ try {
+ for (i <- 0 until numClients) {
+ // todo: add test with different client instances
+ val operator = new Operator(barrier, m, i, numOpsPerClient)
+ operators = operator :: operators
+ val future = executorService.submit(operator)
+ futures = future :: futures
+ }
+ barrier.await // wait for all threads to be ready
+ barrier.await // wait for all threads to finish
+
+ log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported", null)
+ for (future <- futures) future.get
+ }
+ finally {
+ for (operator <- operators) operator.stop
+ }
+ }
+
+ class Operator(barrier: CyclicBarrier, m: Method, clientId: Int, numOpsPerClient: Int) extends Callable[Unit] {
+
+ private lazy val client = new HotRodClient("127.0.0.1", getServer.getPort, cacheName)
+
+ def call {
+ log.debug("Wait for all executions paths to be ready to perform calls", null)
+ barrier.await
+ try {
+ for (i <- 0 until numOpsPerClient) {
+ client.assertPut(m, "k" + clientId + "-" + i + "-", "v" + clientId + "-" + i + "-")
+ }
+ } finally {
+ log.debug("Wait for all execution paths to finish", null)
+ barrier.await
+ }
+ }
+
+ def stop = client.stop
+ }
+}
\ No newline at end of file
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -0,0 +1,44 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.test.SingleCacheManagerTest
+import org.infinispan.server.core.CacheValue
+import test.HotRodClient
+import org.infinispan.AdvancedCache
+import org.infinispan.manager.CacheManager
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import test.HotRodTestingUtil._
+import org.testng.annotations.AfterClass
+import org.jboss.netty.logging.{InternalLoggerFactory, Log4JLoggerFactory}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+abstract class HotRodSingleNodeTest extends SingleCacheManagerTest {
+ val cacheName = "hotrod-cache"
+ private var server: HotRodServer = _
+ private var client: HotRodClient = _
+ private var advancedCache: AdvancedCache[CacheKey, CacheValue] = _
+ private var jmxDomain = getClass.getSimpleName
+
+ override def createCacheManager: CacheManager = {
+ val cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
+ advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
+ server = startHotRodServer(cacheManager)
+ client = new HotRodClient("127.0.0.1", server.getPort, cacheName)
+ cacheManager
+ }
+
+ @AfterClass(alwaysRun = true)
+ override def destroyAfterClass {
+ log.debug("Test finished, close cache, client and Hot Rod server", null)
+ super.destroyAfterClass
+ client.stop
+ server.stop
+ }
+
+ def getServer = server
+
+ def getClient = client
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -13,15 +13,14 @@
import org.infinispan.server.hotrod.OperationStatus._
import org.infinispan.server.hotrod.OperationResponse._
import org.infinispan.server.core.transport.NoState
-import org.jboss.netty.channel.ChannelHandler.Sharable
import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
import org.infinispan.server.core.Logging
import collection.mutable
import collection.immutable
import java.lang.reflect.Method
import test.HotRodTestingUtil._
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit, LinkedBlockingQueue, Executors}
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.atomic.{AtomicLong}
import org.infinispan.test.TestingUtil
/**
@@ -34,12 +33,13 @@
* @author Galder Zamarreño
* @since 4.1
*/
-class HotRodClient(host: String, port: Int, defaultCacheName: String) {
+class HotRodClient(host: String, port: Int, defaultCacheName: String) {
+ val idToOp = new ConcurrentHashMap[Long, Op]
private lazy val ch: Channel = {
val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
- bootstrap.setPipelineFactory(ClientPipelineFactory)
+ bootstrap.setPipelineFactory(new ClientPipelineFactory(this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
// Make a new connection.
@@ -60,6 +60,11 @@
assertStatus(status, Success)
}
+ def assertPut(m: Method, kPrefix: String, vPrefix: String) {
+ val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix))
+ assertStatus(status, Success)
+ }
+
def assertPut(m: Method, lifespan: Int, maxIdle: Int) {
val status = put(k(m) , lifespan, maxIdle, v(m))
assertStatus(status, Success)
@@ -117,9 +122,7 @@
}
private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
- val writeFuture = ch.write(op)
- writeFuture.awaitUninterruptibly
- assertTrue(writeFuture.isSuccess)
+ writeOp(op)
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
if (op.flags == 1) {
val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
@@ -129,9 +132,16 @@
(respWithPrevious.status, respWithPrevious.previous.get)
} else {
(handler.getResponse(expectedResponseMessageId).status, null)
- }
+ }
}
+ private def writeOp(op: Op) {
+ idToOp.put(op.id, op)
+ val future = ch.write(op)
+ future.awaitUninterruptibly
+ assertTrue(future.isSuccess)
+ }
+
def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
val (getSt, actual, version) = get(0x03, k, 0)
(getSt, actual)
@@ -151,9 +161,7 @@
private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
- val writeFuture = ch.write(op)
- writeFuture.awaitUninterruptibly
- assertTrue(writeFuture.isSuccess)
+ val writeFuture = writeOp(op)
// Get the handler instance to retrieve the answer.
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
if (code == 0x03) {
@@ -173,9 +181,7 @@
def stats: Map[String, String] = {
val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
- val writeFuture = ch.write(op)
- writeFuture.awaitUninterruptibly
- assertTrue(writeFuture.isSuccess)
+ val writeFuture = writeOp(op)
// Get the handler instance to retrieve the answer.
var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
val resp = handler.getResponse(op.id).asInstanceOf[StatsResponse]
@@ -186,56 +192,51 @@
}
- at Sharable
-private object ClientPipelineFactory extends ChannelPipelineFactory {
+private class ClientPipelineFactory(client: HotRodClient) extends ChannelPipelineFactory {
- override def getPipeline() = {
+ override def getPipeline = {
val pipeline = Channels.pipeline
- pipeline.addLast("decoder", Decoder)
- pipeline.addLast("encoder", Encoder)
+ pipeline.addLast("decoder", new Decoder(client))
+ pipeline.addLast("encoder", new Encoder)
pipeline.addLast("handler", new ClientHandler)
pipeline
}
}
- at Sharable
-private object Encoder extends OneToOneEncoder {
- val idToOp = new ConcurrentHashMap[Long, Op]
+private class Encoder extends OneToOneEncoder {
override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
- val ret =
- msg match {
- case op: Op => {
- idToOp.put(op.id, op)
- val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
- buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
- buffer.writeUnsignedLong(op.id) // message id
- buffer.writeByte(10) // version
- buffer.writeByte(op.code) // opcode
- buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
- buffer.writeUnsignedInt(op.flags) // flags
- buffer.writeByte(0) // client intelligence
- buffer.writeUnsignedInt(0) // topology id
- if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
- buffer.writeRangedBytes(op.key) // key length + key
- if (op.value != null) {
- if (op.code != 0x0D) { // If it's not removeIfUnmodified...
- buffer.writeUnsignedInt(op.lifespan) // lifespan
- buffer.writeUnsignedInt(op.maxIdle) // maxIdle
- }
- if (op.code == 0x09 || op.code == 0x0D) {
- buffer.writeLong(op.version)
- }
- if (op.code != 0x0D) { // If it's not removeIfUnmodified...
- buffer.writeRangedBytes(op.value) // value length + value
- }
+ trace("Encode {0} so that it's sent to the server", msg)
+ msg match {
+ case op: Op => {
+ val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
+ buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
+ buffer.writeUnsignedLong(op.id) // message id
+ buffer.writeByte(10) // version
+ buffer.writeByte(op.code) // opcode
+ buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
+ buffer.writeUnsignedInt(op.flags) // flags
+ buffer.writeByte(0) // client intelligence
+ buffer.writeUnsignedInt(0) // topology id
+ if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
+ buffer.writeRangedBytes(op.key) // key length + key
+ if (op.value != null) {
+ if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+ buffer.writeUnsignedInt(op.lifespan) // lifespan
+ buffer.writeUnsignedInt(op.maxIdle) // maxIdle
}
+ if (op.code == 0x09 || op.code == 0x0D) {
+ buffer.writeLong(op.version)
+ }
+ if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+ buffer.writeRangedBytes(op.value) // value length + value
+ }
}
- buffer.getUnderlyingChannelBuffer
}
+ buffer.getUnderlyingChannelBuffer
+ }
}
- ret
}
}
@@ -244,60 +245,64 @@
val idCounter = new AtomicLong
}
-private object Decoder extends ReplayingDecoder[NoState] with Logging {
- import Encoder._
+private class Decoder(client: HotRodClient) extends ReplayingDecoder[NoState] with Logging {
override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
+ trace("Decode response from server")
val buf = new ChannelBufferAdapter(buffer)
+ if (buf.readableBytes < 1) {
+ trace("No bytes to decode")
+ return null
+ }
val magic = buf.readUnsignedByte
val id = buf.readUnsignedLong
val opCode = OperationResponse.apply(buf.readUnsignedByte)
val status = OperationStatus.apply(buf.readUnsignedByte)
val topologyChangeMarker = buf.readUnsignedByte
- val resp: Response =
- opCode match {
- case StatsResponse => {
- val size = buf.readUnsignedInt
- val stats = mutable.Map.empty[String, String]
- for (i <- 1 to size) {
- stats += (buf.readString -> buf.readString)
- }
- new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+ val resp: Response = opCode match {
+ case StatsResponse => {
+ val size = buf.readUnsignedInt
+ val stats = mutable.Map.empty[String, String]
+ for (i <- 1 to size) {
+ stats += (buf.readString -> buf.readString)
}
- case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
- | RemoveResponse | RemoveIfUnmodifiedResponse => {
- val op = idToOp.get(id)
- if (op.flags == 1) {
- val length = buf.readUnsignedInt
- if (length == 0) {
- new ResponseWithPrevious(id, opCode, status, None)
- } else {
- val previous = new Array[Byte](length)
- buf.readBytes(previous)
- new ResponseWithPrevious(id, opCode, status, Some(previous))
- }
- } else new Response(id, opCode, status)
- }
- case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
- case GetWithVersionResponse => {
- if (status == Success) {
- val version = buf.readLong
- val data = Some(buf.readRangedBytes)
- new GetWithVersionResponse(id, opCode, status, data, version)
- } else{
- new GetWithVersionResponse(id, opCode, status, None, 0)
+ new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+ }
+ case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
+ | RemoveResponse | RemoveIfUnmodifiedResponse => {
+ val op = client.idToOp.get(id)
+ if (op.flags == 1) {
+ val length = buf.readUnsignedInt
+ if (length == 0) {
+ new ResponseWithPrevious(id, opCode, status, None)
+ } else {
+ val previous = new Array[Byte](length)
+ buf.readBytes(previous)
+ new ResponseWithPrevious(id, opCode, status, Some(previous))
}
+ } else new Response(id, opCode, status)
+ }
+ case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
+ case GetWithVersionResponse => {
+ if (status == Success) {
+ val version = buf.readLong
+ val data = Some(buf.readRangedBytes)
+ new GetWithVersionResponse(id, opCode, status, data, version)
+ } else{
+ new GetWithVersionResponse(id, opCode, status, None, 0)
}
- case GetResponse => {
- if (status == Success) {
- val data = Some(buf.readRangedBytes)
- new GetResponse(id, opCode, status, data)
- } else{
- new GetResponse(id, opCode, status, None)
- }
+ }
+ case GetResponse => {
+ if (status == Success) {
+ val data = Some(buf.readRangedBytes)
+ new GetResponse(id, opCode, status, data)
+ } else{
+ new GetResponse(id, opCode, status, None)
}
- case ErrorResponse => new ErrorResponse(id, status, buf.readString)
}
+ case ErrorResponse => new ErrorResponse(id, status, buf.readString)
+ }
+ trace("Got response from server: {0}", resp)
resp
}
@@ -312,6 +317,7 @@
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
val resp = e.getMessage.asInstanceOf[Response]
+ trace("Put {0} in responses", resp)
responses.put(resp.messageId, resp)
}
@@ -326,13 +332,13 @@
i += 1
}
}
- while (v == null && i < 20)
+ while (v == null && i < 100)
v
}
}
-private class Op(val magic: Int,
+case class Op(val magic: Int,
val code: Byte,
val cacheName: String,
val key: Array[Byte],
@@ -344,7 +350,7 @@
lazy val id = HotRodClient.idCounter.incrementAndGet
}
-private class StatsOp(override val magic: Int,
+class StatsOp(override val magic: Int,
override val code: Byte,
override val cacheName: String,
val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -4,12 +4,12 @@
import org.infinispan.server.core.Operation._
import org.infinispan.server.memcached.MemcachedOperation._
import org.infinispan.context.Flag
-import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
import java.io.{IOException, EOFException, StreamCorruptedException}
import java.nio.channels.ClosedChannelException
import java.util.concurrent.atomic.AtomicLong
import org.infinispan.stats.Stats
-import org.infinispan.server.core.transport.{Channel, ChannelBuffers, ChannelHandlerContext, ChannelBuffer}
+import org.infinispan.server.core.transport.ChannelBuffer
import org.infinispan.server.core._
import org.infinispan.{AdvancedCache, Version, CacheException, Cache}
import collection.mutable.ListBuffer
@@ -22,14 +22,14 @@
* @since
*/
-class MemcachedDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
+class MemcachedDecoder(cacheManager: CacheManager, scheduler: ScheduledExecutorService)
+ extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
import RequestResolver._
type SuitableParameters = MemcachedParameters
type SuitableHeader = RequestHeader
- private var scheduler: ScheduledExecutorService = _
- private var cache: Cache[String, MemcachedValue] = _
+ private lazy val cache = createCache
private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
private final val incrMisses = new AtomicLong(0)
private final val incrHits = new AtomicLong(0)
@@ -352,15 +352,15 @@
buffer
}
- override def start {
- scheduler = Executors.newScheduledThreadPool(1)
- cache = createCache
- }
+// override def start {
+// scheduler = Executors.newScheduledThreadPool(1)
+// cache = createCache
+// }
+//
+// override def stop {
+// scheduler.shutdown
+// }
- override def stop {
- scheduler.shutdown
- }
-
private def createValue(data: Array[Byte], nextVersion: Long, flags: Int): MemcachedValue = {
new MemcachedValue(data, nextVersion, flags)
}
Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -3,6 +3,7 @@
import org.infinispan.server.core.AbstractProtocolServer
import org.infinispan.server.core.transport.{Decoder, Encoder}
import org.infinispan.manager.CacheManager
+import java.util.concurrent.{Executors, ScheduledExecutorService}
/**
* // TODO: Document this
@@ -12,8 +13,14 @@
class MemcachedServer extends AbstractProtocolServer {
- protected override def getEncoder: Encoder = null
+ protected lazy val scheduler = Executors.newScheduledThreadPool(1)
- protected override def getDecoder(cacheManager: CacheManager): Decoder = new MemcachedDecoder(cacheManager)
+ override def getEncoder: Encoder = null
+ override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager, scheduler)
+
+ override def stop {
+ super.stop
+ scheduler.shutdown
+ }
}
\ No newline at end of file
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala 2010-04-07 10:37:02 UTC (rev 1666)
@@ -48,9 +48,9 @@
def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
val server = new MemcachedServer {
- protected override def getDecoder(cacheManager: CacheManager): Decoder = {
- new MemcachedDecoder(cacheManager) {
- override def createCache = cacheManager.getCache[String, MemcachedValue](cacheName)
+ override def getDecoder: Decoder = {
+ new MemcachedDecoder(getCacheManager, scheduler) {
+ override def createCache = getCacheManager.getCache[String, MemcachedValue](cacheName)
}
}
}
More information about the infinispan-commits
mailing list