[infinispan-commits] Infinispan SVN: r1698 - in trunk/server: hotrod/src/test/scala/org/infinispan/server/hotrod and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Apr 16 06:36:30 EDT 2010


Author: galder.zamarreno at jboss.com
Date: 2010-04-16 06:36:27 -0400 (Fri, 16 Apr 2010)
New Revision: 1698

Added:
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala
Modified:
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
   trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
   trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
   trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
Log:
[ISPN-404] (Memcached and Hot Rod server shutdown hangs in clients connected) Fixed.

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,16 +1,16 @@
 package org.infinispan.server.core.transport.netty
 
 import org.jboss.netty.handler.codec.replay.ReplayingDecoder
-import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
 import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
 import org.infinispan.server.core.transport._
+import org.jboss.netty.channel.{ChannelStateEvent, ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since 4.1
  */
-class DecoderAdapter(decoder: Decoder) extends ReplayingDecoder[NoState](true) {
+class DecoderAdapter(decoder: Decoder, transport: NettyTransport) extends ReplayingDecoder[NoState](true) {
 
    override def decode(nCtx: NettyChannelHandlerContext, channel: NettyChannel,
                        nBuffer: NettyChannelBuffer, passedState: NoState): AnyRef = {
@@ -26,4 +26,8 @@
       decoder.decodeLast(new ChannelHandlerContextAdapter(nCtx), new ChannelBufferAdapter(nBuffer));
    }
 
+   override def channelOpen(ctx: NettyChannelHandlerContext, e: ChannelStateEvent) {
+      transport.acceptedChannels.add(e.getChannel)
+   }
+
 }
\ No newline at end of file

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -8,12 +8,12 @@
  * @author Galder Zamarreño
  * @since 4.1
  */
-class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler)
+class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler, transport: NettyTransport)
       extends ChannelPipelineFactory {
 
    override def getPipeline: ChannelPipeline = {
       val pipeline = Channels.pipeline
-      pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder))
+      pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder, transport))
       if (encoder != null)
          pipeline.addLast("encoder", encoder)
       return pipeline;

Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -22,17 +22,17 @@
                   threadNamePrefix: String) extends Transport {
    import NettyTransport._
 
-   val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
+   private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
    val acceptedChannels = new DefaultChannelGroup(threadNamePrefix + "-Accepted")
-   val pipeline = new NettyChannelPipelineFactory(server, encoder)
-   val factory = {
+   private val pipeline = new NettyChannelPipelineFactory(server, encoder, this)
+   private val factory = {
       if (workerThreads == 0)
          new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
       else
          new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads)
    }
    
-   lazy val masterExecutor = {
+   private lazy val masterExecutor = {
       if (masterThreads == 0) {
          debug("Configured unlimited threads for master thread pool")
          Executors.newCachedThreadPool
@@ -42,7 +42,7 @@
       }
    }
 
-   lazy val workerExecutor = {
+   private lazy val workerExecutor = {
       if (workerThreads == 0) {
          debug("Configured unlimited threads for worker thread pool")
          Executors.newCachedThreadPool

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -15,8 +15,6 @@
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodConcurrentTest")
 class HotRodConcurrentTest extends HotRodSingleNodeTest {
 
-   override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
-   
    def testConcurrentPutRequests(m: Method) {
       val numClients = 10
       val numOpsPerClient = 100

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -26,8 +26,6 @@
 @Test(groups = Array("functional"), testName = "server.hotrod.HotRodFunctionalTest")
 class HotRodFunctionalTest extends HotRodSingleNodeTest {
 
-   override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
-   
    def testUnknownCommand(m: Method) {
       val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 1, 0).status
       assertEquals(status, UnknownOperation,

Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala	                        (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,21 @@
+package org.infinispan.server.hotrod
+
+import org.testng.annotations.Test
+import java.lang.reflect.Method
+import org.jboss.netty.channel.ChannelFuture
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.HotRodShutdownTest")
+class HotRodShutdownTest extends HotRodSingleNodeTest {
+
+   override protected def shutdownClient: ChannelFuture = null
+
+   def testPutBasic(m: Method) {
+      client.assertPut(m)
+   }
+
+}
\ No newline at end of file

Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -7,6 +7,8 @@
 import org.infinispan.manager.CacheManager
 import test.HotRodTestingUtil._
 import org.testng.annotations.AfterClass
+import org.jboss.netty.channel.ChannelFuture
+import org.infinispan.test.fwk.TestCacheManagerFactory
 
 /**
  * // TODO: Document this
@@ -28,20 +30,21 @@
       cacheManager
    }
 
-   def createTestCacheManager: CacheManager 
+   protected def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true) 
 
    @AfterClass(alwaysRun = true)
    override def destroyAfterClass {
       log.debug("Test finished, close cache, client and Hot Rod server", null)
       super.destroyAfterClass
-      hotRodClient.stop
+      shutdownClient
       hotRodServer.stop
    }
 
-   def server = hotRodServer
+   protected def server = hotRodServer
 
-   def client = hotRodClient
+   protected def client = hotRodClient
 
-   def jmxDomain = hotRodJmxDomain
+   protected def jmxDomain = hotRodJmxDomain
 
+   protected def shutdownClient: ChannelFuture = hotRodClient.stop
 }
\ No newline at end of file

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,15 +1,12 @@
 package org.infinispan.server.memcached
 
-import org.infinispan.test.fwk.TestCacheManagerFactory
 import org.infinispan.manager.CacheManager
-import test.MemcachedTestingUtil
 import java.lang.reflect.Method
 import java.util.concurrent.TimeUnit
 import org.testng.Assert._
-import org.testng.annotations.{AfterClass, Test}
-import net.spy.memcached.{CASResponse, MemcachedClient}
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
-import java.net.SocketAddress
+import org.testng.annotations.Test
+import net.spy.memcached.CASResponse
+import org.infinispan.test.TestingUtil
 import org.infinispan.Version
 
 /**
@@ -17,27 +14,9 @@
  * @author Galder Zamarreño
  * @since
  */
- at Test(groups = Array("functional"), testName = "server.memcached.FunctionalTest")
-class MemcachedFunctionalTest extends SingleCacheManagerTest with MemcachedTestingUtil {
-   private var client: MemcachedClient = _
-   private var server: MemcachedServer = _
-   private val timeout: Int = 60
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedFunctionalTest")
+class MemcachedFunctionalTest extends MemcachedSingleNodeTest {
 
-   override def createCacheManager: CacheManager = {
-      cacheManager = TestCacheManagerFactory.createLocalCacheManager
-      server = startMemcachedTextServer(cacheManager)
-      client = createMemcachedClient(60000, server.getPort)
-      return cacheManager
-   }
-
-   @AfterClass(alwaysRun = true)
-   override def destroyAfterClass {
-      super.destroyAfterClass
-      log.debug("Test finished, close memcached server", null)
-      client.shutdown
-      server.stop
-   }
-
    def testSetBasic(m: Method) {
       val f = client.set(k(m), 0, v(m))
       assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -15,7 +15,7 @@
  * @since
  */
 
- at Test(groups = Array("functional"), testName = "server.memcached.MemcachedClusterTest")
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedReplicationTest")
 class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
    private val cacheName = "MemcachedReplSync"
    private[this] var servers: List[MemcachedServer] = List()

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,24 @@
+package org.infinispan.server.memcached
+
+import java.util.concurrent.TimeUnit
+import java.lang.reflect.Method
+import org.testng.annotations.Test
+import org.testng.Assert._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedShutdownTest")
+class MemcachedShutdownTest extends MemcachedSingleNodeTest {
+
+   override protected def shutdownClient {}
+
+   def testAny(m: Method) {
+      val f = client.set(k(m), 0, v(m))
+      assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+      assertEquals(client.get(k(m)), v(m))
+   }
+
+}
\ No newline at end of file

Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala	                        (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,44 @@
+package org.infinispan.server.memcached
+
+import test.MemcachedTestingUtil
+import org.infinispan.test.SingleCacheManagerTest
+import org.infinispan.manager.CacheManager
+import org.testng.annotations.AfterClass
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import net.spy.memcached.MemcachedClient
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+abstract class MemcachedSingleNodeTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+   private var memcachedClient: MemcachedClient = _
+   private var memcachedServer: MemcachedServer = _
+   private val operationTimeout: Int = 60
+
+   override def createCacheManager: CacheManager = {
+      cacheManager = createTestCacheManager
+      memcachedServer = startMemcachedTextServer(cacheManager)
+      memcachedClient = createMemcachedClient(60000, server.getPort)
+      return cacheManager
+   }
+
+   protected def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager
+
+   @AfterClass(alwaysRun = true)
+   override def destroyAfterClass {
+      super.destroyAfterClass
+      log.debug("Test finished, close memcached server", null)
+      shutdownClient
+      memcachedServer.stop
+   }
+
+   protected def client: MemcachedClient = memcachedClient
+
+   protected def timeout: Int = operationTimeout
+
+   protected def server: MemcachedServer = memcachedServer
+
+   protected def shutdownClient = memcachedClient.shutdown
+}
\ No newline at end of file

Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala	2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala	2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,43 +1,25 @@
 package org.infinispan.server.memcached
 
-import test.MemcachedTestingUtil
-import org.testng.annotations.{AfterClass, Test}
+import org.testng.annotations.Test
 import org.infinispan.manager.CacheManager
 import org.infinispan.test.fwk.TestCacheManagerFactory
-import net.spy.memcached.MemcachedClient
 import java.lang.reflect.Method
 import org.testng.Assert._
 import java.util.concurrent.TimeUnit
 import org.infinispan.Version
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+import org.infinispan.test.TestingUtil
 
 /**
  * // TODO: Document this
  * @author Galder Zamarreño
  * @since
  */
- at Test(groups = Array("functional"), testName = "server.memcached.StatsTest")
-class MemcachedStatsTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedStatsTest")
+class MemcachedStatsTest extends MemcachedSingleNodeTest {
    private var jmxDomain = classOf[MemcachedStatsTest].getSimpleName
-   private var client: MemcachedClient = _
-   private var server: MemcachedServer = _
-   private var timeout: Int = 60
 
-   override def createCacheManager: CacheManager = {
-      cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
-      server = startMemcachedTextServer(cacheManager)
-      client = createMemcachedClient(60000, server.getPort)
-      return cacheManager
-   }
+   override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
 
-   @AfterClass(alwaysRun = true)
-   override def destroyAfterClass {
-      super.destroyAfterClass
-      log.debug("Test finished, close memcached server", null)
-      client.shutdown
-      server.stop
-   }
-
    def testUnsupportedStats(m: Method) {
       val stats = getStats
       assertEquals(stats.get("pid"), "0")



More information about the infinispan-commits mailing list