[infinispan-commits] Infinispan SVN: r1666 - in trunk/server: core/src/main/scala/org/infinispan/server/core/transport and 6 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Apr 7 06:37:04 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-04-07 06:37:02 -0400 (Wed, 07 Apr 2010)
New Revision: 1666

Added:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
   trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
   trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
Log:
[ISPN-391] (Hot Rod/Memcached decoders fail to decode properly with multiple worker threads) Fixed by making sure each pipeline gets a new decoder.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolDecoder.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -28,7 +28,10 @@
    private val versionCounter = new AtomicInteger
 
    override def decode(ctx: ChannelHandlerContext, buffer: ChannelBuffer): AnyRef = {
-      if (buffer.readableBytes < 1) return null
+      if (buffer.readableBytes < 1) {
+         trace("No bytes to decode")
+         return null
+      }
       val header = readHeader(buffer)
       if (header == null) return null // Something went wrong reading the header, so get more bytes 
       try {
@@ -153,6 +156,7 @@
          errorResponse match {
             case a: Array[Byte] => ch.write(wrappedBuffer(a))
             case sb: StringBuilder => ch.write(wrappedBuffer(sb.toString.getBytes))
+            case null => // ignore
             case _ => ch.write(errorResponse)
          }
       }

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/AbstractProtocolServer.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,9 +1,9 @@
 package org.infinispan.server.core
 
 import java.net.InetSocketAddress
-import transport.netty.{EncoderAdapter, DecoderAdapter, NettyTransport}
+import transport.netty.{EncoderAdapter, NettyTransport}
 import transport.{Decoder, Encoder, Transport}
-import org.infinispan.manager.{DefaultCacheManager, CacheManager}
+import org.infinispan.manager.CacheManager
 
 /**
  * // TODO: Document this
@@ -25,32 +25,25 @@
       this.port = port
       this.masterThreads = masterThreads
       this.workerThreads = workerThreads
+      this.cacheManager = cacheManager
 
-      decoder = getDecoder(cacheManager)
-      decoder.start
       encoder = getEncoder
       // TODO: add an IdleStateHandler so that idle connections are detected, this could help on malformed data
       // TODO: ... requests such as when the lenght of data is bigger than the expected data itself.
-      val nettyDecoder = if (decoder != null) new DecoderAdapter(decoder) else null
       val nettyEncoder = if (encoder != null) new EncoderAdapter(encoder) else null
       val address =  new InetSocketAddress(host, port)
       // TODO change cache name 'default' to something more meaningful and dependent of protocol
-      transport = new NettyTransport(nettyDecoder, nettyEncoder, address, masterThreads, workerThreads, "default")
+      transport = new NettyTransport(this, nettyEncoder, address, masterThreads, workerThreads, "default")
       transport.start
    }
 
    override def stop {
       if (transport != null)
          transport.stop
-      if (decoder != null)
-         decoder.stop
-//      cacheManager.stop
    }
 
+   def getCacheManager = cacheManager
+
    def getPort = port
 
-   protected def getEncoder: Encoder
-
-   protected def getDecoder(cacheManager: CacheManager): Decoder
-
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/ProtocolServer.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,6 +1,7 @@
 package org.infinispan.server.core
 
 import org.infinispan.manager.CacheManager
+import transport.{Decoder, Encoder}
 
 /**
  * // TODO: Document this
@@ -9,5 +10,7 @@
  */
 trait ProtocolServer {
    def start(host: String, port: Int, cacheManager: CacheManager, masterThreads: Int, workerThreads: Int)
-   def stop 
+   def stop
+   def getEncoder: Encoder
+   def getDecoder: Decoder
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/Decoder.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -11,8 +11,4 @@
    
    def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)
 
-   def start
-
-   def stop
-   
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/ChannelBufferAdapter.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -2,13 +2,14 @@
 
 import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
 import org.infinispan.server.core.transport.{VLong, VInt, ChannelBuffer}
+import org.infinispan.server.core.Logging
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since 4.1
  */
-class ChannelBufferAdapter(val buffer: NettyChannelBuffer) extends ChannelBuffer {
+class ChannelBufferAdapter(buffer: NettyChannelBuffer) extends ChannelBuffer {
    
    override def readByte: Byte = buffer.readByte
    override def readBytes(dst: Array[Byte], dstIndex: Int, length: Int) = buffer.readBytes(dst, dstIndex, length)
@@ -52,4 +53,6 @@
 
    override def getUnderlyingChannelBuffer: AnyRef = buffer
 
-}
\ No newline at end of file
+}
+
+object ChannelBufferAdapter extends Logging
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -3,7 +3,8 @@
 import org.jboss.netty.handler.codec.replay.ReplayingDecoder
 import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
 import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
-import org.infinispan.server.core.transport._;
+import org.infinispan.server.core.transport._
+import org.infinispan.server.core.Logging;
 
 /**
  * // TODO: Document this

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -1,23 +1,21 @@
 package org.infinispan.server.core.transport.netty
 
 import org.jboss.netty.channel._
+import org.infinispan.server.core.ProtocolServer
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since 4.1
  */
-class NettyChannelPipelineFactory(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler)
+class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler)
       extends ChannelPipelineFactory {
 
    override def getPipeline: ChannelPipeline = {
       val pipeline = Channels.pipeline
-      if (decoder != null) {
-         pipeline.addLast("decoder", decoder);
-      }
-      if (encoder != null) {
-         pipeline.addLast("encoder", encoder);
-      }
+      pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder))
+      if (encoder != null)
+         pipeline.addLast("encoder", encoder)
       return pipeline;
    }
 

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -8,21 +8,23 @@
 import org.jboss.netty.bootstrap.ServerBootstrap
 import java.util.concurrent.{TimeUnit, Executors, ThreadFactory, ExecutorService}
 import org.infinispan.server.core.transport.Transport
-import org.infinispan.server.core.Logging
 import scala.collection.JavaConversions._
+import org.infinispan.manager.CacheManager
+import org.infinispan.server.core.{ProtocolServer, Logging}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since 4.1
  */
-class NettyTransport(decoder: ChannelUpstreamHandler, encoder: ChannelDownstreamHandler,
-                  address: SocketAddress, masterThreads: Int, workerThreads: Int, cacheName: String) extends Transport {
+class NettyTransport(server: ProtocolServer, encoder: ChannelDownstreamHandler,
+                  address: SocketAddress, masterThreads: Int, workerThreads: Int,
+                  cacheName: String) extends Transport {
    import NettyTransport._
 
    val serverChannels = new DefaultChannelGroup(cacheName + "-channels")
    val acceptedChannels = new DefaultChannelGroup(cacheName + "-accepted")
-   val pipeline = new NettyChannelPipelineFactory(decoder, encoder)
+   val pipeline = new NettyChannelPipelineFactory(server, encoder)
    val factory = {
       if (workerThreads == 0)
          new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
@@ -40,22 +42,18 @@
          Executors.newFixedThreadPool(masterThreads, tf)
       }
    }
-   //todo investigate the actual reason why multiple threads do not work
-   lazy val workerExecutor =  {
+
+   lazy val workerExecutor = {
       val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
-      Executors.newSingleThreadExecutor(tf)
+      if (workerThreads == 0) {
+         debug("Configured unlimited threads for worker thread pool")
+         Executors.newCachedThreadPool(tf)
+      }
+      else {
+         debug("Configured {0} threads for worker thread pool", workerThreads)
+         Executors.newFixedThreadPool(workerThreads, tf)
+      }
    }
-//   lazy val workerExecutor = {
-//      val tf = new NamedThreadFactory(cacheName + '-' + "Worker")
-//      if (workerThreads == 0) {
-//         debug("Configured unlimited threads for worker thread pool")
-//         Executors.newCachedThreadPool(tf)
-//      }
-//      else {
-//         debug("Configured {0} threads for worker thread pool", workerThreads)
-//         Executors.newFixedThreadPool(workerThreads, tf)
-//      }
-//   }
 
    override def start {
       val bootstrap = new ServerBootstrap(factory);

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodDecoder.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -10,6 +10,7 @@
 import org.infinispan.util.concurrent.TimeoutException
 import org.infinispan.server.hotrod.ProtocolFlag._
 import org.infinispan.server.hotrod.OperationResponse._
+import java.nio.channels.ClosedChannelException
 
 /**
  * // TODO: Document this
@@ -114,12 +115,11 @@
                case e: Exception => new ErrorResponse(messageId, ServerError, e.toString)
             }
          }
+         case c: ClosedChannelException => null
+         case t: Throwable => new ErrorResponse(0, ServerError, t.toString)
       }
    }
 
-   override def start {}
-
-   override def stop {}
 }
 
 object HotRodDecoder extends Logging {

Modified: trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala
===================================================================
--- trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -12,36 +12,8 @@
 
 class HotRodServer extends AbstractProtocolServer {
 
-   protected override def getEncoder: Encoder = new HotRodEncoder
+   override def getEncoder: Encoder = new HotRodEncoder
 
-   protected override def getDecoder(cacheManager: CacheManager): Decoder = new HotRodDecoder(cacheManager)
+   override def getDecoder: Decoder = new HotRodDecoder(getCacheManager)
 
-}
-
-//
-////class HotRodServer(val host: String,
-////                val port: Int,
-////                val manager: CacheManager,
-////                val masterThreads: Int,
-////                val workerThreads: Int) {
-////
-////   import HotRodServer._
-////
-////   private var server: Server = _
-////
-////   def start {
-////      val decoder = new GlobalDecoder
-////      val nettyDecoder = new NettyNoStateDecoder(decoder)
-////      val encoder = new Encoder410
-////      val nettyEncoder = new NettyEncoder(encoder)
-////      val commandHandler = new Handler(new CallerCache(manager))
-////      server = new NettyServer(commandHandler, nettyDecoder, nettyEncoder, new InetSocketAddress(host, port),
-////                               masterThreads, workerThreads, "HotRod")
-////      server.start
-////      info("Started Hot Rod bound to {0}:{1}", host, port)
-////   }
-////
-////   def stop {
-////      if (server != null) server.stop
-////   }
-////}
\ No newline at end of file
+}
\ No newline at end of file

Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -0,0 +1,61 @@
+package org.infinispan.server.hotrod
+
+import java.lang.reflect.Method
+import java.util.concurrent.{Callable, Executors, Future, CyclicBarrier}
+import test.HotRodClient
+import org.testng.annotations.Test
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.HotRodConcurrentTest")
+class HotRodConcurrentTest extends HotRodSingleNodeTest {
+
+   def testConcurrentPutRequests(m: Method) {
+      val numClients = 10
+      val numOpsPerClient = 100
+      val barrier = new CyclicBarrier(numClients + 1)
+      val executorService = Executors.newCachedThreadPool
+      var futures: List[Future[Unit]] = List()
+      var operators: List[Operator] = List()
+      try {
+         for (i <- 0 until numClients) {
+            // todo: add test with different client instances
+            val operator = new Operator(barrier, m, i, numOpsPerClient)
+            operators = operator :: operators
+            val future = executorService.submit(operator)
+            futures = future :: futures
+         }
+         barrier.await // wait for all threads to be ready
+         barrier.await // wait for all threads to finish
+
+         log.debug("All threads finished, let's shutdown the executor and check whether any exceptions were reported", null)
+         for (future <- futures) future.get
+      }
+      finally {
+         for (operator <- operators) operator.stop
+      }
+   }
+
+   class Operator(barrier: CyclicBarrier, m: Method, clientId: Int, numOpsPerClient: Int) extends Callable[Unit] {
+
+      private lazy val client = new HotRodClient("127.0.0.1", getServer.getPort, cacheName)
+
+      def call {
+         log.debug("Wait for all executions paths to be ready to perform calls", null)
+         barrier.await
+         try {
+            for (i <- 0 until numOpsPerClient) {
+               client.assertPut(m, "k" + clientId + "-" + i + "-", "v" + clientId + "-" + i + "-")
+            }
+         } finally {
+            log.debug("Wait for all execution paths to finish", null)
+            barrier.await
+         }
+      }
+
+      def stop = client.stop
+   }
+}
\ No newline at end of file

Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -0,0 +1,44 @@
+package org.infinispan.server.hotrod
+
+import org.infinispan.test.SingleCacheManagerTest
+import org.infinispan.server.core.CacheValue
+import test.HotRodClient
+import org.infinispan.AdvancedCache
+import org.infinispan.manager.CacheManager
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import test.HotRodTestingUtil._
+import org.testng.annotations.AfterClass
+import org.jboss.netty.logging.{InternalLoggerFactory, Log4JLoggerFactory}
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+abstract class HotRodSingleNodeTest extends SingleCacheManagerTest {
+   val cacheName = "hotrod-cache"
+   private var server: HotRodServer = _
+   private var client: HotRodClient = _
+   private var advancedCache: AdvancedCache[CacheKey, CacheValue] = _
+   private var jmxDomain = getClass.getSimpleName
+   
+   override def createCacheManager: CacheManager = {
+      val cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
+      advancedCache = cacheManager.getCache[CacheKey, CacheValue](cacheName).getAdvancedCache
+      server = startHotRodServer(cacheManager)
+      client = new HotRodClient("127.0.0.1", server.getPort, cacheName)
+      cacheManager
+   }
+
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      log.debug("Test finished, close cache, client and Hot Rod server", null)
+      super.destroyAfterClass
+      client.stop
+      server.stop
+   }
+
+   def getServer = server
+
+   def getClient = client
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/test/HotRodClient.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -13,15 +13,14 @@
 import org.infinispan.server.hotrod.OperationStatus._
 import org.infinispan.server.hotrod.OperationResponse._
 import org.infinispan.server.core.transport.NoState
-import org.jboss.netty.channel.ChannelHandler.Sharable
 import org.infinispan.server.core.transport.netty.{ChannelBufferAdapter}
 import org.infinispan.server.core.Logging
 import collection.mutable
 import collection.immutable
 import java.lang.reflect.Method
 import test.HotRodTestingUtil._
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit, LinkedBlockingQueue, Executors}
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.concurrent.atomic.{AtomicLong}
 import org.infinispan.test.TestingUtil
 
 /**
@@ -34,12 +33,13 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class HotRodClient(host: String, port: Int, defaultCacheName: String) {   
+class HotRodClient(host: String, port: Int, defaultCacheName: String) {
+   val idToOp = new ConcurrentHashMap[Long, Op]    
 
    private lazy val ch: Channel = {
       val factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
       val bootstrap: ClientBootstrap = new ClientBootstrap(factory)
-      bootstrap.setPipelineFactory(ClientPipelineFactory)
+      bootstrap.setPipelineFactory(new ClientPipelineFactory(this))
       bootstrap.setOption("tcpNoDelay", true)
       bootstrap.setOption("keepAlive", true)
       // Make a new connection.
@@ -60,6 +60,11 @@
       assertStatus(status, Success)
    }
 
+   def assertPut(m: Method, kPrefix: String, vPrefix: String) {
+      val status = put(k(m, kPrefix) , 0, 0, v(m, vPrefix))
+      assertStatus(status, Success)
+   }
+
    def assertPut(m: Method, lifespan: Int, maxIdle: Int) {
       val status = put(k(m) , lifespan, maxIdle, v(m))
       assertStatus(status, Success)
@@ -117,9 +122,7 @@
    }
 
    private def execute(op: Op, expectedResponseMessageId: Long): (OperationStatus, Array[Byte]) = {
-      val writeFuture = ch.write(op)
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
+      writeOp(op)
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       if (op.flags == 1) {
          val respWithPrevious = handler.getResponse(expectedResponseMessageId).asInstanceOf[ResponseWithPrevious]
@@ -129,9 +132,16 @@
             (respWithPrevious.status, respWithPrevious.previous.get)
       } else {
          (handler.getResponse(expectedResponseMessageId).status, null)
-      }       
+      }
    }
 
+   private def writeOp(op: Op) {
+      idToOp.put(op.id, op)
+      val future = ch.write(op)
+      future.awaitUninterruptibly
+      assertTrue(future.isSuccess)
+   }
+
    def get(k: Array[Byte], flags: Int): (OperationStatus, Array[Byte]) = {
       val (getSt, actual, version) = get(0x03, k, 0)
       (getSt, actual)
@@ -151,9 +161,7 @@
 
    private def get(code: Byte, k: Array[Byte], flags: Int): (OperationStatus, Array[Byte], Long) = {
       val op = new Op(0xA0, code, defaultCacheName, k, 0, 0, null, flags, 0)
-      val writeFuture = ch.write(op)
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
+      val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       if (code == 0x03) {
@@ -173,9 +181,7 @@
 
    def stats: Map[String, String] = {
       val op = new StatsOp(0xA0, 0x15, defaultCacheName, null)
-      val writeFuture = ch.write(op)
-      writeFuture.awaitUninterruptibly
-      assertTrue(writeFuture.isSuccess)
+      val writeFuture = writeOp(op)
       // Get the handler instance to retrieve the answer.
       var handler = ch.getPipeline.getLast.asInstanceOf[ClientHandler]
       val resp = handler.getResponse(op.id).asInstanceOf[StatsResponse]
@@ -186,56 +192,51 @@
 
 }
 
- at Sharable
-private object ClientPipelineFactory extends ChannelPipelineFactory {
+private class ClientPipelineFactory(client: HotRodClient) extends ChannelPipelineFactory {
 
-   override def getPipeline() = {
+   override def getPipeline = {
       val pipeline = Channels.pipeline
-      pipeline.addLast("decoder", Decoder)
-      pipeline.addLast("encoder", Encoder)
+      pipeline.addLast("decoder", new Decoder(client))
+      pipeline.addLast("encoder", new Encoder)
       pipeline.addLast("handler", new ClientHandler)
       pipeline
    }
 
 }
 
- at Sharable
-private object Encoder extends OneToOneEncoder {
-   val idToOp = new ConcurrentHashMap[Long, Op] 
+private class Encoder extends OneToOneEncoder {
 
    override def encode(ctx: ChannelHandlerContext, ch: Channel, msg: Any) = {
-      val ret =
-         msg match {
-            case op: Op => {
-               idToOp.put(op.id, op)
-               val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
-               buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
-               buffer.writeUnsignedLong(op.id) // message id
-               buffer.writeByte(10) // version
-               buffer.writeByte(op.code) // opcode
-               buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
-               buffer.writeUnsignedInt(op.flags) // flags
-               buffer.writeByte(0) // client intelligence
-               buffer.writeUnsignedInt(0) // topology id
-               if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op... 
-                  buffer.writeRangedBytes(op.key) // key length + key
-                  if (op.value != null) {
-                     if (op.code != 0x0D) { // If it's not removeIfUnmodified...
-                        buffer.writeUnsignedInt(op.lifespan) // lifespan
-                        buffer.writeUnsignedInt(op.maxIdle) // maxIdle
-                     }
-                     if (op.code == 0x09 || op.code == 0x0D) {
-                        buffer.writeLong(op.version)
-                     }
-                     if (op.code != 0x0D) { // If it's not removeIfUnmodified...
-                        buffer.writeRangedBytes(op.value) // value length + value
-                     }
+      trace("Encode {0} so that it's sent to the server", msg)
+      msg match {
+         case op: Op => {
+            val buffer = new ChannelBufferAdapter(ChannelBuffers.dynamicBuffer)
+            buffer.writeByte(op.magic.asInstanceOf[Byte]) // magic
+            buffer.writeUnsignedLong(op.id) // message id
+            buffer.writeByte(10) // version
+            buffer.writeByte(op.code) // opcode
+            buffer.writeRangedBytes(op.cacheName.getBytes()) // cache name length + cache name
+            buffer.writeUnsignedInt(op.flags) // flags
+            buffer.writeByte(0) // client intelligence
+            buffer.writeUnsignedInt(0) // topology id
+            if (op.code != 0x13 && op.code != 0x15 && op.code != 0x17) { // if it's a key based op...
+               buffer.writeRangedBytes(op.key) // key length + key
+               if (op.value != null) {
+                  if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+                     buffer.writeUnsignedInt(op.lifespan) // lifespan
+                     buffer.writeUnsignedInt(op.maxIdle) // maxIdle
                   }
+                  if (op.code == 0x09 || op.code == 0x0D) {
+                     buffer.writeLong(op.version)
+                  }
+                  if (op.code != 0x0D) { // If it's not removeIfUnmodified...
+                     buffer.writeRangedBytes(op.value) // value length + value
+                  }
                }
-               buffer.getUnderlyingChannelBuffer
             }
+            buffer.getUnderlyingChannelBuffer
+         }
       }
-      ret
    }
 
 }
@@ -244,60 +245,64 @@
    val idCounter = new AtomicLong
 }
 
-private object Decoder extends ReplayingDecoder[NoState] with Logging {
-   import Encoder._
+private class Decoder(client: HotRodClient) extends ReplayingDecoder[NoState] with Logging {
 
    override def decode(ctx: ChannelHandlerContext, ch: Channel, buffer: ChannelBuffer, state: NoState): Object = {
+      trace("Decode response from server")
       val buf = new ChannelBufferAdapter(buffer)
+      if (buf.readableBytes < 1) {
+         trace("No bytes to decode")
+         return null
+      }
       val magic = buf.readUnsignedByte
       val id = buf.readUnsignedLong
       val opCode = OperationResponse.apply(buf.readUnsignedByte)
       val status = OperationStatus.apply(buf.readUnsignedByte)
       val topologyChangeMarker = buf.readUnsignedByte
-      val resp: Response =
-         opCode match {
-            case StatsResponse => {
-               val size = buf.readUnsignedInt
-               val stats = mutable.Map.empty[String, String]
-               for (i <- 1 to size) {
-                  stats += (buf.readString -> buf.readString)
-               }
-               new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+      val resp: Response = opCode match {
+         case StatsResponse => {
+            val size = buf.readUnsignedInt
+            val stats = mutable.Map.empty[String, String]
+            for (i <- 1 to size) {
+               stats += (buf.readString -> buf.readString)
             }
-            case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
-                 | RemoveResponse | RemoveIfUnmodifiedResponse => {
-               val op = idToOp.get(id)
-               if (op.flags == 1) {
-                  val length = buf.readUnsignedInt
-                  if (length == 0) {
-                     new ResponseWithPrevious(id, opCode, status, None)
-                  } else {
-                     val previous = new Array[Byte](length)
-                     buf.readBytes(previous)
-                     new ResponseWithPrevious(id, opCode, status, Some(previous))
-                  }
-               } else new Response(id, opCode, status)
-            }
-            case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
-            case GetWithVersionResponse  => {
-               if (status == Success) {
-                  val version = buf.readLong
-                  val data = Some(buf.readRangedBytes)
-                  new GetWithVersionResponse(id, opCode, status, data, version)
-               } else{
-                  new GetWithVersionResponse(id, opCode, status, None, 0)
+            new StatsResponse(id, immutable.Map[String, String]() ++ stats)
+         }
+         case PutResponse | PutIfAbsentResponse | ReplaceResponse | ReplaceIfUnmodifiedResponse
+              | RemoveResponse | RemoveIfUnmodifiedResponse => {
+            val op = client.idToOp.get(id)
+            if (op.flags == 1) {
+               val length = buf.readUnsignedInt
+               if (length == 0) {
+                  new ResponseWithPrevious(id, opCode, status, None)
+               } else {
+                  val previous = new Array[Byte](length)
+                  buf.readBytes(previous)
+                  new ResponseWithPrevious(id, opCode, status, Some(previous))
                }
+            } else new Response(id, opCode, status)
+         }
+         case ContainsKeyResponse | ClearResponse | PingResponse => new Response(id, opCode, status)
+         case GetWithVersionResponse  => {
+            if (status == Success) {
+               val version = buf.readLong
+               val data = Some(buf.readRangedBytes)
+               new GetWithVersionResponse(id, opCode, status, data, version)
+            } else{
+               new GetWithVersionResponse(id, opCode, status, None, 0)
             }
-            case GetResponse => {
-               if (status == Success) {
-                  val data = Some(buf.readRangedBytes)
-                  new GetResponse(id, opCode, status, data)
-               } else{
-                  new GetResponse(id, opCode, status, None)
-               }
+         }
+         case GetResponse => {
+            if (status == Success) {
+               val data = Some(buf.readRangedBytes)
+               new GetResponse(id, opCode, status, data)
+            } else{
+               new GetResponse(id, opCode, status, None)
             }
-            case ErrorResponse => new ErrorResponse(id, status, buf.readString)
          }
+         case ErrorResponse => new ErrorResponse(id, status, buf.readString)
+      }
+      trace("Got response from server: {0}", resp)
       resp
    }
 
@@ -312,6 +317,7 @@
 
    override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
       val resp = e.getMessage.asInstanceOf[Response]
+      trace("Put {0} in responses", resp)
       responses.put(resp.messageId, resp)
    }
 
@@ -326,13 +332,13 @@
             i += 1
          }
       }
-      while (v == null && i < 20)
+      while (v == null && i < 100)
       v
    }
 
 }
 
-private class Op(val magic: Int,
+case class Op(val magic: Int,
                  val code: Byte,
                  val cacheName: String,
                  val key: Array[Byte],
@@ -344,7 +350,7 @@
    lazy val id = HotRodClient.idCounter.incrementAndGet
 }
 
-private class StatsOp(override val magic: Int,
+class StatsOp(override val magic: Int,
                  override val code: Byte,
                  override val cacheName: String,
                  val statName: String) extends Op(magic, code, cacheName, null, 0, 0, null, 0, 0)
\ No newline at end of file

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedDecoder.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -4,12 +4,12 @@
 import org.infinispan.server.core.Operation._
 import org.infinispan.server.memcached.MemcachedOperation._
 import org.infinispan.context.Flag
-import java.util.concurrent.{TimeUnit, Executors, ScheduledExecutorService}
+import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
 import java.io.{IOException, EOFException, StreamCorruptedException}
 import java.nio.channels.ClosedChannelException
 import java.util.concurrent.atomic.AtomicLong
 import org.infinispan.stats.Stats
-import org.infinispan.server.core.transport.{Channel, ChannelBuffers, ChannelHandlerContext, ChannelBuffer}
+import org.infinispan.server.core.transport.ChannelBuffer
 import org.infinispan.server.core._
 import org.infinispan.{AdvancedCache, Version, CacheException, Cache}
 import collection.mutable.ListBuffer
@@ -22,14 +22,14 @@
  * @since
  */
 
-class MemcachedDecoder(cacheManager: CacheManager) extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
+class MemcachedDecoder(cacheManager: CacheManager, scheduler: ScheduledExecutorService)
+      extends AbstractProtocolDecoder[String, MemcachedValue] with TextProtocolUtil {
    import RequestResolver._
 
    type SuitableParameters = MemcachedParameters
    type SuitableHeader = RequestHeader
 
-   private var scheduler: ScheduledExecutorService = _
-   private var cache: Cache[String, MemcachedValue] = _
+   private lazy val cache = createCache
    private lazy val isStatsEnabled = cache.getConfiguration.isExposeJmxStatistics
    private final val incrMisses = new AtomicLong(0)
    private final val incrHits = new AtomicLong(0)
@@ -352,15 +352,15 @@
       buffer
    }
 
-   override def start {
-      scheduler = Executors.newScheduledThreadPool(1)
-      cache = createCache
-   }
+//   override def start {
+//      scheduler = Executors.newScheduledThreadPool(1)
+//      cache = createCache
+//   }
+//
+//   override def stop {
+//      scheduler.shutdown
+//   }
 
-   override def stop {
-      scheduler.shutdown
-   }
-
    private def createValue(data: Array[Byte], nextVersion: Long, flags: Int): MemcachedValue = {
       new MemcachedValue(data, nextVersion, flags)
    }   

Modified: trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala
===================================================================
--- trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/main/scala/org/infinispan/server/memcached/MemcachedServer.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -3,6 +3,7 @@
 import org.infinispan.server.core.AbstractProtocolServer
 import org.infinispan.server.core.transport.{Decoder, Encoder}
 import org.infinispan.manager.CacheManager
+import java.util.concurrent.{Executors, ScheduledExecutorService}
 
 /**
  * // TODO: Document this
@@ -12,8 +13,14 @@
 
 class MemcachedServer extends AbstractProtocolServer {
 
-   protected override def getEncoder: Encoder = null
+   protected lazy val scheduler = Executors.newScheduledThreadPool(1)
 
-   protected override def getDecoder(cacheManager: CacheManager): Decoder = new MemcachedDecoder(cacheManager)
+   override def getEncoder: Encoder = null
 
+   override def getDecoder: Decoder = new MemcachedDecoder(getCacheManager, scheduler)
+
+   override def stop {
+      super.stop
+      scheduler.shutdown
+   }
 }
\ No newline at end of file

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-04-06 15:10:53 UTC (rev 1665)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/test/MemcachedTestingUtil.scala	2010-04-07 10:37:02 UTC (rev 1666)
@@ -48,9 +48,9 @@
 
    def startMemcachedTextServer(cacheManager: CacheManager, port: Int, cacheName: String): MemcachedServer = {
       val server = new MemcachedServer {
-         protected override def getDecoder(cacheManager: CacheManager): Decoder = {
-            new MemcachedDecoder(cacheManager) {
-               override def createCache = cacheManager.getCache[String, MemcachedValue](cacheName)
+         override def getDecoder: Decoder = {
+            new MemcachedDecoder(getCacheManager, scheduler) {
+               override def createCache = getCacheManager.getCache[String, MemcachedValue](cacheName)
             }
          }
       }



More information about the infinispan-commits mailing list