[infinispan-commits] Infinispan SVN: r1360 - in trunk/server/memcached/src/main/java/org/infinispan/server: memcached and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Jan 11 10:36:00 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-11 10:36:00 -0500 (Mon, 11 Jan 2010)
New Revision: 1360

Modified:
   trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
   trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
Log:
[ISPN-173] (Build memcached server module) Fixed shutdown of memcached server to avoid NETTY-256.

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java	2010-01-11 15:12:19 UTC (rev 1359)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/core/netty/NettyServerBootstrap.java	2010-01-11 15:36:00 UTC (rev 1360)
@@ -24,6 +24,10 @@
 
 import java.net.SocketAddress;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.infinispan.server.core.CommandHandler;
 import org.infinispan.server.core.netty.memcached.NettyMemcachedDecoder;
@@ -51,9 +55,29 @@
    final ChannelFactory factory;
    final ChannelGroup serverChannels = new DefaultChannelGroup("memcached-channels");
    final ChannelGroup acceptedChannels = new DefaultChannelGroup("memcached-accepted");
+   final ExecutorService masterExecutor;
+   final ExecutorService workerExecutor;
    
    public NettyServerBootstrap(CommandHandler commandHandler, NettyMemcachedDecoder decoder, SocketAddress address, 
-            ExecutorService masterExecutor, ExecutorService workerExecutor, int workerThreads) {
+            int masterThreads, int workerThreads, String cacheName) {
+      ThreadFactory tf = new MemcachedThreadFactory(cacheName, ExecutorType.MASTER);
+      if (masterThreads == 0) {
+         log.debug("Configured unlimited threads for master thread pool");
+         masterExecutor = Executors.newCachedThreadPool(tf);
+      } else {
+         log.debug("Configured {0} threads for master thread pool", masterThreads);
+         masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
+      }
+
+      tf = new MemcachedThreadFactory(cacheName, ExecutorType.WORKER);
+      if (workerThreads == 0) {
+         log.debug("Configured unlimited threads for worker thread pool");
+         workerExecutor = Executors.newCachedThreadPool(tf);
+      } else {
+         log.debug("Configured {0} threads for worker thread pool", workerThreads);
+         workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
+      }
+
       NettyChannelUpstreamHandler handler = new NettyChannelUpstreamHandler(commandHandler, acceptedChannels);
       this.pipeline = new NettyChannelPipelineFactory(decoder, handler);
       this.address = address;
@@ -74,8 +98,28 @@
 
    @Override
    public void stop() {
+   // We *pause* the acceptor so no new connections are made
+      ChannelGroupFuture future = serverChannels.unbind().awaitUninterruptibly();
+      if (!future.isCompleteSuccess()) {
+         log.warn("Server channel group did not completely unbind");
+         for (Channel ch : future.getGroup()) {
+            if (ch.isBound()) {
+               log.warn("{0} is still bound to {1}", ch, ch.getRemoteAddress());
+            }
+         }
+      }
+
+      // TODO remove workaround when integrating Netty 3.2.x - https://jira.jboss.org/jira/browse/NETTY-256
+      masterExecutor.shutdown();
+      try {
+         masterExecutor.awaitTermination(30, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+         e.printStackTrace();
+      }
+
+      workerExecutor.shutdown();
       serverChannels.close().awaitUninterruptibly();
-      ChannelGroupFuture future = acceptedChannels.close().awaitUninterruptibly();
+      future = acceptedChannels.close().awaitUninterruptibly();
       if (!future.isCompleteSuccess()) {
          log.warn("Channel group did not completely close");
          for (Channel ch : future.getGroup()) {
@@ -87,4 +131,35 @@
       log.debug("Channel group completely closed, release external resources");
       factory.releaseExternalResources();
    }
+
+   private static class MemcachedThreadFactory implements ThreadFactory {
+      final String cacheName;
+      final ExecutorType type;
+
+      MemcachedThreadFactory(String cacheName, ExecutorType type) {
+         this.cacheName = cacheName;
+         this.type = type;
+      }
+
+      @Override
+      public Thread newThread(Runnable r) {
+         Thread t = new Thread(r, System.getProperty("program.name") + "-" + cacheName + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
+         t.setDaemon(true);
+         return t;
+      }
+   }
+
+   private static enum ExecutorType {
+      MASTER(1), WORKER(1);
+
+      final AtomicInteger threadCounter;
+
+      ExecutorType(int startIndex) {
+         this.threadCounter = new AtomicInteger(startIndex);
+      }
+
+      int getAndIncrement() {
+         return threadCounter.getAndIncrement();
+      }
+   }
 }

Modified: trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java
===================================================================
--- trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-01-11 15:12:19 UTC (rev 1359)
+++ trunk/server/memcached/src/main/java/org/infinispan/server/memcached/TextServer.java	2010-01-11 15:36:00 UTC (rev 1360)
@@ -91,26 +91,8 @@
       NettyMemcachedDecoder decoder = new NettyMemcachedDecoder(cache, chain, scheduler);
       TextCommandHandler commandHandler = new TextCommandHandler(cache, chain);
 
-      ThreadFactory tf = new MemcachedThreadFactory(cache, ExecutorType.MASTER);
-      if (masterThreads == 0) {
-         log.debug("Configured unlimited threads for master thread pool");
-         masterExecutor = Executors.newCachedThreadPool(tf);
-      } else {
-         log.debug("Configured {0} threads for master thread pool", masterThreads);
-         masterExecutor = Executors.newFixedThreadPool(masterThreads, tf);
-      }
-
-      tf = new MemcachedThreadFactory(cache, ExecutorType.WORKER);
-      if (workerThreads == 0) {
-         log.debug("Configured unlimited threads for worker thread pool");
-         workerExecutor = Executors.newCachedThreadPool(tf);
-      } else {
-         log.debug("Configured {0} threads for worker thread pool", workerThreads);
-         workerExecutor = Executors.newFixedThreadPool(workerThreads, tf);
-      }
-
       bootstrap = new NettyServerBootstrap(commandHandler, decoder, new InetSocketAddress(host, port), 
-               masterExecutor, workerExecutor, workerThreads);
+               masterThreads, workerThreads, cache.getName());
       bootstrap.start();
       log.info("Started Memcached text server bound to {0}:{1}", host, port);
    }
@@ -123,34 +105,4 @@
       scheduler.shutdown();
    }
 
-   private static class MemcachedThreadFactory implements ThreadFactory {
-      final Cache cache;
-      final ExecutorType type;
-
-      MemcachedThreadFactory(Cache cache, ExecutorType type) {
-         this.cache = cache;
-         this.type = type;
-      }
-
-      @Override
-      public Thread newThread(Runnable r) {
-         Thread t = new Thread(r, System.getProperty("program.name") + "-" + cache.getName() + '-' + type.toString().toLowerCase() + '-' + type.getAndIncrement());
-         t.setDaemon(true);
-         return t;
-      }
-   }
-
-   private static enum ExecutorType {
-      MASTER(1), WORKER(1);
-
-      final AtomicInteger threadCounter;
-
-      ExecutorType(int startIndex) {
-         this.threadCounter = new AtomicInteger(startIndex);
-      }
-
-      int getAndIncrement() {
-         return threadCounter.getAndIncrement();
-      }
-   }
 }



More information about the infinispan-commits mailing list