[infinispan-commits] Infinispan SVN: r1698 - in trunk/server: hotrod/src/test/scala/org/infinispan/server/hotrod and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Apr 16 06:36:30 EDT 2010
Author: galder.zamarreno at jboss.com
Date: 2010-04-16 06:36:27 -0400 (Fri, 16 Apr 2010)
New Revision: 1698
Added:
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala
Modified:
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
Log:
[ISPN-404] (Memcached and Hot Rod server shutdown hangs in clients connected) Fixed.
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/DecoderAdapter.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,16 +1,16 @@
package org.infinispan.server.core.transport.netty
import org.jboss.netty.handler.codec.replay.ReplayingDecoder
-import org.jboss.netty.channel.{ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
import org.jboss.netty.buffer.{ChannelBuffer => NettyChannelBuffer}
import org.infinispan.server.core.transport._
+import org.jboss.netty.channel.{ChannelStateEvent, ExceptionEvent => NettyExceptionEvent, ChannelHandlerContext => NettyChannelHandlerContext, Channel => NettyChannel}
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since 4.1
*/
-class DecoderAdapter(decoder: Decoder) extends ReplayingDecoder[NoState](true) {
+class DecoderAdapter(decoder: Decoder, transport: NettyTransport) extends ReplayingDecoder[NoState](true) {
override def decode(nCtx: NettyChannelHandlerContext, channel: NettyChannel,
nBuffer: NettyChannelBuffer, passedState: NoState): AnyRef = {
@@ -26,4 +26,8 @@
decoder.decodeLast(new ChannelHandlerContextAdapter(nCtx), new ChannelBufferAdapter(nBuffer));
}
+ override def channelOpen(ctx: NettyChannelHandlerContext, e: ChannelStateEvent) {
+ transport.acceptedChannels.add(e.getChannel)
+ }
+
}
\ No newline at end of file
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyChannelPipelineFactory.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -8,12 +8,12 @@
* @author Galder Zamarreño
* @since 4.1
*/
-class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler)
+class NettyChannelPipelineFactory(server: ProtocolServer, encoder: ChannelDownstreamHandler, transport: NettyTransport)
extends ChannelPipelineFactory {
override def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline
- pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder))
+ pipeline.addLast("decoder", new DecoderAdapter(server.getDecoder, transport))
if (encoder != null)
pipeline.addLast("encoder", encoder)
return pipeline;
Modified: trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala
===================================================================
--- trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/core/src/main/scala/org/infinispan/server/core/transport/netty/NettyTransport.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -22,17 +22,17 @@
threadNamePrefix: String) extends Transport {
import NettyTransport._
- val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
+ private val serverChannels = new DefaultChannelGroup(threadNamePrefix + "-Channels")
val acceptedChannels = new DefaultChannelGroup(threadNamePrefix + "-Accepted")
- val pipeline = new NettyChannelPipelineFactory(server, encoder)
- val factory = {
+ private val pipeline = new NettyChannelPipelineFactory(server, encoder, this)
+ private val factory = {
if (workerThreads == 0)
new NioServerSocketChannelFactory(masterExecutor, workerExecutor)
else
new NioServerSocketChannelFactory(masterExecutor, workerExecutor, workerThreads)
}
- lazy val masterExecutor = {
+ private lazy val masterExecutor = {
if (masterThreads == 0) {
debug("Configured unlimited threads for master thread pool")
Executors.newCachedThreadPool
@@ -42,7 +42,7 @@
}
}
- lazy val workerExecutor = {
+ private lazy val workerExecutor = {
if (workerThreads == 0) {
debug("Configured unlimited threads for worker thread pool")
Executors.newCachedThreadPool
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodConcurrentTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -15,8 +15,6 @@
@Test(groups = Array("functional"), testName = "server.hotrod.HotRodConcurrentTest")
class HotRodConcurrentTest extends HotRodSingleNodeTest {
- override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
-
def testConcurrentPutRequests(m: Method) {
val numClients = 10
val numOpsPerClient = 100
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodFunctionalTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -26,8 +26,6 @@
@Test(groups = Array("functional"), testName = "server.hotrod.HotRodFunctionalTest")
class HotRodFunctionalTest extends HotRodSingleNodeTest {
- override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
-
def testUnknownCommand(m: Method) {
val status = client.execute(0xA0, 0x77, cacheName, k(m) , 0, 0, v(m), 0, 1, 0).status
assertEquals(status, UnknownOperation,
Added: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala (rev 0)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodShutdownTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,21 @@
+package org.infinispan.server.hotrod
+
+import org.testng.annotations.Test
+import java.lang.reflect.Method
+import org.jboss.netty.channel.ChannelFuture
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.hotrod.HotRodShutdownTest")
+class HotRodShutdownTest extends HotRodSingleNodeTest {
+
+ override protected def shutdownClient: ChannelFuture = null
+
+ def testPutBasic(m: Method) {
+ client.assertPut(m)
+ }
+
+}
\ No newline at end of file
Modified: trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala
===================================================================
--- trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/hotrod/src/test/scala/org/infinispan/server/hotrod/HotRodSingleNodeTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -7,6 +7,8 @@
import org.infinispan.manager.CacheManager
import test.HotRodTestingUtil._
import org.testng.annotations.AfterClass
+import org.jboss.netty.channel.ChannelFuture
+import org.infinispan.test.fwk.TestCacheManagerFactory
/**
* // TODO: Document this
@@ -28,20 +30,21 @@
cacheManager
}
- def createTestCacheManager: CacheManager
+ protected def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager(true)
@AfterClass(alwaysRun = true)
override def destroyAfterClass {
log.debug("Test finished, close cache, client and Hot Rod server", null)
super.destroyAfterClass
- hotRodClient.stop
+ shutdownClient
hotRodServer.stop
}
- def server = hotRodServer
+ protected def server = hotRodServer
- def client = hotRodClient
+ protected def client = hotRodClient
- def jmxDomain = hotRodJmxDomain
+ protected def jmxDomain = hotRodJmxDomain
+ protected def shutdownClient: ChannelFuture = hotRodClient.stop
}
\ No newline at end of file
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedFunctionalTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,15 +1,12 @@
package org.infinispan.server.memcached
-import org.infinispan.test.fwk.TestCacheManagerFactory
import org.infinispan.manager.CacheManager
-import test.MemcachedTestingUtil
import java.lang.reflect.Method
import java.util.concurrent.TimeUnit
import org.testng.Assert._
-import org.testng.annotations.{AfterClass, Test}
-import net.spy.memcached.{CASResponse, MemcachedClient}
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
-import java.net.SocketAddress
+import org.testng.annotations.Test
+import net.spy.memcached.CASResponse
+import org.infinispan.test.TestingUtil
import org.infinispan.Version
/**
@@ -17,27 +14,9 @@
* @author Galder Zamarreño
* @since
*/
- at Test(groups = Array("functional"), testName = "server.memcached.FunctionalTest")
-class MemcachedFunctionalTest extends SingleCacheManagerTest with MemcachedTestingUtil {
- private var client: MemcachedClient = _
- private var server: MemcachedServer = _
- private val timeout: Int = 60
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedFunctionalTest")
+class MemcachedFunctionalTest extends MemcachedSingleNodeTest {
- override def createCacheManager: CacheManager = {
- cacheManager = TestCacheManagerFactory.createLocalCacheManager
- server = startMemcachedTextServer(cacheManager)
- client = createMemcachedClient(60000, server.getPort)
- return cacheManager
- }
-
- @AfterClass(alwaysRun = true)
- override def destroyAfterClass {
- super.destroyAfterClass
- log.debug("Test finished, close memcached server", null)
- client.shutdown
- server.stop
- }
-
def testSetBasic(m: Method) {
val f = client.set(k(m), 0, v(m))
assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedReplicationTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -15,7 +15,7 @@
* @since
*/
- at Test(groups = Array("functional"), testName = "server.memcached.MemcachedClusterTest")
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedReplicationTest")
class MemcachedReplicationTest extends MultipleCacheManagersTest with MemcachedTestingUtil {
private val cacheName = "MemcachedReplSync"
private[this] var servers: List[MemcachedServer] = List()
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedShutdownTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,24 @@
+package org.infinispan.server.memcached
+
+import java.util.concurrent.TimeUnit
+import java.lang.reflect.Method
+import org.testng.annotations.Test
+import org.testng.Assert._
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedShutdownTest")
+class MemcachedShutdownTest extends MemcachedSingleNodeTest {
+
+ override protected def shutdownClient {}
+
+ def testAny(m: Method) {
+ val f = client.set(k(m), 0, v(m))
+ assertTrue(f.get(timeout, TimeUnit.SECONDS).booleanValue)
+ assertEquals(client.get(k(m)), v(m))
+ }
+
+}
\ No newline at end of file
Added: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala (rev 0)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedSingleNodeTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -0,0 +1,44 @@
+package org.infinispan.server.memcached
+
+import test.MemcachedTestingUtil
+import org.infinispan.test.SingleCacheManagerTest
+import org.infinispan.manager.CacheManager
+import org.testng.annotations.AfterClass
+import org.infinispan.test.fwk.TestCacheManagerFactory
+import net.spy.memcached.MemcachedClient
+
+/**
+ * // TODO: Document this
+ * @author Galder Zamarreño
+ * @since // TODO
+ */
+abstract class MemcachedSingleNodeTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+ private var memcachedClient: MemcachedClient = _
+ private var memcachedServer: MemcachedServer = _
+ private val operationTimeout: Int = 60
+
+ override def createCacheManager: CacheManager = {
+ cacheManager = createTestCacheManager
+ memcachedServer = startMemcachedTextServer(cacheManager)
+ memcachedClient = createMemcachedClient(60000, server.getPort)
+ return cacheManager
+ }
+
+ protected def createTestCacheManager: CacheManager = TestCacheManagerFactory.createLocalCacheManager
+
+ @AfterClass(alwaysRun = true)
+ override def destroyAfterClass {
+ super.destroyAfterClass
+ log.debug("Test finished, close memcached server", null)
+ shutdownClient
+ memcachedServer.stop
+ }
+
+ protected def client: MemcachedClient = memcachedClient
+
+ protected def timeout: Int = operationTimeout
+
+ protected def server: MemcachedServer = memcachedServer
+
+ protected def shutdownClient = memcachedClient.shutdown
+}
\ No newline at end of file
Modified: trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala
===================================================================
--- trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala 2010-04-16 00:18:56 UTC (rev 1697)
+++ trunk/server/memcached/src/test/scala/org/infinispan/server/memcached/MemcachedStatsTest.scala 2010-04-16 10:36:27 UTC (rev 1698)
@@ -1,43 +1,25 @@
package org.infinispan.server.memcached
-import test.MemcachedTestingUtil
-import org.testng.annotations.{AfterClass, Test}
+import org.testng.annotations.Test
import org.infinispan.manager.CacheManager
import org.infinispan.test.fwk.TestCacheManagerFactory
-import net.spy.memcached.MemcachedClient
import java.lang.reflect.Method
import org.testng.Assert._
import java.util.concurrent.TimeUnit
import org.infinispan.Version
-import org.infinispan.test.{TestingUtil, SingleCacheManagerTest}
+import org.infinispan.test.TestingUtil
/**
* // TODO: Document this
* @author Galder Zamarreño
* @since
*/
- at Test(groups = Array("functional"), testName = "server.memcached.StatsTest")
-class MemcachedStatsTest extends SingleCacheManagerTest with MemcachedTestingUtil {
+ at Test(groups = Array("functional"), testName = "server.memcached.MemcachedStatsTest")
+class MemcachedStatsTest extends MemcachedSingleNodeTest {
private var jmxDomain = classOf[MemcachedStatsTest].getSimpleName
- private var client: MemcachedClient = _
- private var server: MemcachedServer = _
- private var timeout: Int = 60
- override def createCacheManager: CacheManager = {
- cacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
- server = startMemcachedTextServer(cacheManager)
- client = createMemcachedClient(60000, server.getPort)
- return cacheManager
- }
+ override def createTestCacheManager: CacheManager = TestCacheManagerFactory.createCacheManagerEnforceJmxDomain(jmxDomain)
- @AfterClass(alwaysRun = true)
- override def destroyAfterClass {
- super.destroyAfterClass
- log.debug("Test finished, close memcached server", null)
- client.shutdown
- server.stop
- }
-
def testUnsupportedStats(m: Method) {
val stats = getStats
assertEquals(stats.get("pid"), "0")
More information about the infinispan-commits
mailing list