[infinispan-commits] Infinispan SVN: r1360 - in trunk/server/memcached/src/main/java/org/infinispan/server: memcached and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Jan 11 10:36:00 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-01-11 10:36:00 -0500 (Mon, 11 Jan 2010)
New Revision: 1360
Modified:
trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Log:
[ISPN-173] (Build memcached server module) Fixed shutdown of memcached server to avoid NETTY-256.
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java 2010-01-11 15:12:19 UTC (rev 1359)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java 2010-01-11 15:36:00 UTC (rev 1360)
@@ -24,6 +24,10 @@
import java.net.SocketAddress;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.server.core.CommandHandler;
import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
@@ -51,9 +55,29 @@
final ChannelFactory factory;
final ChannelGroup serverChannels = new DefaultChannelGroup("memcached-channels");
final ChannelGroup acceptedChannels = new DefaultChannelGroup("memcached-accepted");
+ final ExecutorService masterExecutor;
+ final ExecutorService workerExecutor;
public NettyServerBootstrap(CommandHandler commandHandler, NettyMemcachedDecoder decoder, SocketAddress address,
- ExecutorService masterExecutor, ExecutorService workerExecutor, int workerThreads) {
+ int masterThreads, int workerThreads, String cacheName) {
+ ThreadFactory tf = new MemcachedThreadFactory(cacheName, ExecutorType.MASTER);
+ if (masterThreads == 0) {
+ log.debug("Configured unlimited threads for master thread pool");
+ masterExecutor = Executors.newCachedThreadPool(tf);
+ } else {
+ log.debug("Configured {0} threads for master thread pool", masterThreads);
+ masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
+ }
+
+ tf = new MemcachedThreadFactory(cacheName, ExecutorType.WORKER);
+ if (workerThreads == 0) {
+ log.debug("Configured unlimited threads for worker thread pool");
+ workerExecutor = Executors.newCachedThreadPool(tf);
+ } else {
+ log.debug("Configured {0} threads for worker thread pool", workerThreads);
+ workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
+ }
+
NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
this.address = address;
@@ -74,8 +98,28 @@
@Override
public void stop() {
+ // We *pause* the acceptor so no new connections are made
+ ChannelGroupFuture future = serverChannels.unbind().awaitUninterruptibly();
+ if (!future.isCompleteSuccess()) {
+ log.warn("Server channel group did not completely unbind");
+ for (Channel ch : future.getGroup()) {
+ if (ch.isBound()) {
+ log.warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+ }
+ }
+ }
+
+ // TODO remove workaround when integrating Netty 3.2.x - https://jira.jboss.org/jira/browse/NETTY-256
+ masterExecutor.shutdown();
+ try {
+ masterExecutor.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ workerExecutor.shutdown();
serverChannels.close().awaitUninterruptibly();
- ChannelGroupFuture future = acceptedChannels.close().awaitUninterruptibly();
+ future = acceptedChannels.close().awaitUninterruptibly();
if (!future.isCompleteSuccess()) {
log.warn("Channel group did not completely close");
for (Channel ch : future.getGroup()) {
@@ -87,4 +131,35 @@
log.debug("Channel group completely closed, release external resources");
factory.releaseExternalResources();
}
+
+ private static class MemcachedThreadFactory implements ThreadFactory {
+ final String cacheName;
+ final ExecutorType type;
+
+ MemcachedThreadFactory(String cacheName, ExecutorType type) {
+ this.cacheName = cacheName;
+ this.type = type;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, System.getProperty("program.name") + "-" + cacheName + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ }
+
+ private static enum ExecutorType {
+ MASTER(1), WORKER(1);
+
+ final AtomicInteger threadCounter;
+
+ ExecutorType(int startIndex) {
+ this.threadCounter = new AtomicInteger(startIndex);
+ }
+
+ int getAndIncrement() {
+ return threadCounter.getAndIncrement();
+ }
+ }
}
Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-01-11 15:12:19 UTC (rev 1359)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java 2010-01-11 15:36:00 UTC (rev 1360)
@@ -91,26 +91,8 @@
NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
- ThreadFactory tf = new MemcachedThreadFactory(cache, ExecutorType.MASTER);
- if (masterThreads == 0) {
- log.debug("Configured unlimited threads for master thread pool");
- masterExecutor = Executors.newCachedThreadPool(tf);
- } else {
- log.debug("Configured {0} threads for master thread pool", masterThreads);
- masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
- }
-
- tf = new MemcachedThreadFactory(cache, ExecutorType.WORKER);
- if (workerThreads == 0) {
- log.debug("Configured unlimited threads for worker thread pool");
- workerExecutor = Executors.newCachedThreadPool(tf);
- } else {
- log.debug("Configured {0} threads for worker thread pool", workerThreads);
- workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
- }
-
bootstrap = new NettyServerBootstrap(commandHandler, decoder, new InetSocketAddress(host, port),
- masterExecutor, workerExecutor, workerThreads);
+ masterThreads, workerThreads, cache.getName());
bootstrap.start();
log.info("Started Memcached text server bound to {0}:{1}", host, port);
}
@@ -123,34 +105,4 @@
scheduler.shutdown();
}
- private static class MemcachedThreadFactory implements ThreadFactory {
- final Cache cache;
- final ExecutorType type;
-
- MemcachedThreadFactory(Cache cache, ExecutorType type) {
- this.cache = cache;
- this.type = type;
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, System.getProperty("program.name") + "-" + cache.getName() + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
- t.setDaemon(true);
- return t;
- }
- }
-
- private static enum ExecutorType {
- MASTER(1), WORKER(1);
-
- final AtomicInteger threadCounter;
-
- ExecutorType(int startIndex) {
- this.threadCounter = new AtomicInteger(startIndex);
- }
-
- int getAndIncrement() {
- return threadCounter.getAndIncrement();
- }
- }
}
More information about the infinispan-commits
mailing list